Best Python code snippet using localstack_python
parsers.py
Source:parsers.py  
...395                    xml_dict[key] = [xml_dict[key], item]396            else:397                xml_dict[key] = item398        return xml_dict399    def _parse_xml_string_to_dom(self, xml_string):400        try:401            parser = ETree.XMLParser(402                target=ETree.TreeBuilder(),403                encoding=self.DEFAULT_ENCODING)404            parser.feed(xml_string)405            root = parser.close()406        except XMLParseError as e:407            raise ResponseParserError(408                "Unable to parse response (%s), "409                "invalid XML received. Further retries may succeed:\n%s" %410                (e, xml_string))411        return root412    def _replace_nodes(self, parsed):413        for key, value in parsed.items():414            if list(value):415                sub_dict = self._build_name_to_xml_node(value)416                parsed[key] = self._replace_nodes(sub_dict)417            else:418                parsed[key] = value.text419        return parsed420    @_text_content421    def _handle_boolean(self, shape, text):422        if text == 'true':423            return True424        else:425            return False426    @_text_content427    def _handle_float(self, shape, text):428        return float(text)429    @_text_content430    def _handle_timestamp(self, shape, text):431        return self._timestamp_parser(text)432    @_text_content433    def _handle_integer(self, shape, text):434        return int(text)435    @_text_content436    def _handle_string(self, shape, text):437        return text438    @_text_content439    def _handle_blob(self, shape, text):440        return self._blob_parser(text)441    _handle_character = _handle_string442    _handle_double = _handle_float443    _handle_long = _handle_integer444class QueryParser(BaseXMLResponseParser):445    def _do_error_parse(self, response, shape):446        xml_contents = response['body']447        root = self._parse_xml_string_to_dom(xml_contents)448        parsed = self._build_name_to_xml_node(root)449        self._replace_nodes(parsed)450        # Once we've converted xml->dict, we need to make one or two451        # more adjustments to extract nested errors and to be consistent452        # with ResponseMetadata for non-error responses:453        # 1. {"Errors": {"Error": {...}}} -> {"Error": {...}}454        # 2. {"RequestId": "id"} -> {"ResponseMetadata": {"RequestId": "id"}}455        if 'Errors' in parsed:456            parsed.update(parsed.pop('Errors'))457        if 'RequestId' in parsed:458            parsed['ResponseMetadata'] = {'RequestId': parsed.pop('RequestId')}459        return parsed460    def _do_modeled_error_parse(self, response, shape):461        return self._parse_body_as_xml(response, shape, inject_metadata=False)462    def _do_parse(self, response, shape):463        return self._parse_body_as_xml(response, shape, inject_metadata=True)464    def _parse_body_as_xml(self, response, shape, inject_metadata=True):465        xml_contents = response['body']466        root = self._parse_xml_string_to_dom(xml_contents)467        parsed = {}468        if shape is not None:469            start = root470            if 'resultWrapper' in shape.serialization:471                start = self._find_result_wrapped_shape(472                    shape.serialization['resultWrapper'],473                    root)474            parsed = self._parse_shape(shape, start)475        if inject_metadata:476            self._inject_response_metadata(root, parsed)477        return parsed478    def _find_result_wrapped_shape(self, element_name, xml_root_node):479        mapping = self._build_name_to_xml_node(xml_root_node)480        return mapping[element_name]481    def _inject_response_metadata(self, node, inject_into):482        mapping = self._build_name_to_xml_node(node)483        child_node = mapping.get('ResponseMetadata')484        if child_node is not None:485            sub_mapping = self._build_name_to_xml_node(child_node)486            for key, value in sub_mapping.items():487                sub_mapping[key] = value.text488            inject_into['ResponseMetadata'] = sub_mapping489class EC2QueryParser(QueryParser):490    def _inject_response_metadata(self, node, inject_into):491        mapping = self._build_name_to_xml_node(node)492        child_node = mapping.get('requestId')493        if child_node is not None:494            inject_into['ResponseMetadata'] = {'RequestId': child_node.text}495    def _do_error_parse(self, response, shape):496        # EC2 errors look like:497        # <Response>498        #   <Errors>499        #     <Error>500        #       <Code>InvalidInstanceID.Malformed</Code>501        #       <Message>Invalid id: "1343124"</Message>502        #     </Error>503        #   </Errors>504        #   <RequestID>12345</RequestID>505        # </Response>506        # This is different from QueryParser in that it's RequestID,507        # not RequestId508        original = super(EC2QueryParser, self)._do_error_parse(response, shape)509        if 'RequestID' in original:510            original['ResponseMetadata'] = {511                'RequestId': original.pop('RequestID')512            }513        return original514    def _get_error_root(self, original_root):515        for child in original_root:516            if self._node_tag(child) == 'Errors':517                for errors_child in child:518                    if self._node_tag(errors_child) == 'Error':519                        return errors_child520        return original_root521class BaseJSONParser(ResponseParser):522    def _handle_structure(self, shape, value):523        final_parsed = {}524        if shape.is_document_type:525            final_parsed = value526        else:527            member_shapes = shape.members528            if value is None:529                # If the comes across the wire as "null" (None in python),530                # we should be returning this unchanged, instead of as an531                # empty dict.532                return None533            final_parsed = {}534            if self._has_unknown_tagged_union_member(shape, value):535                tag = self._get_first_key(value)536                return self._handle_unknown_tagged_union_member(tag)537            for member_name in member_shapes:538                member_shape = member_shapes[member_name]539                json_name = member_shape.serialization.get('name', member_name)540                raw_value = value.get(json_name)541                if raw_value is not None:542                    final_parsed[member_name] = self._parse_shape(543                        member_shapes[member_name],544                        raw_value)545        return final_parsed546    def _handle_map(self, shape, value):547        parsed = {}548        key_shape = shape.key549        value_shape = shape.value550        for key, value in value.items():551            actual_key = self._parse_shape(key_shape, key)552            actual_value = self._parse_shape(value_shape, value)553            parsed[actual_key] = actual_value554        return parsed555    def _handle_blob(self, shape, value):556        return self._blob_parser(value)557    def _handle_timestamp(self, shape, value):558        return self._timestamp_parser(value)559    def _do_error_parse(self, response, shape):560        body = self._parse_body_as_json(response['body'])561        error = {"Error": {"Message": '', "Code": ''}, "ResponseMetadata": {}}562        # Error responses can have slightly different structures for json.563        # The basic structure is:564        #565        # {"__type":"ConnectClientException",566        #  "message":"The error message."}567        # The error message can either come in the 'message' or 'Message' key568        # so we need to check for both.569        error['Error']['Message'] = body.get('message',570                                             body.get('Message', ''))571        # if the message did not contain an error code572        # include the response status code573        response_code = response.get('status_code')574        code = body.get('__type', response_code and str(response_code))575        if code is not None:576            # code has a couple forms as well:577            # * "com.aws.dynamodb.vAPI#ProvisionedThroughputExceededException"578            # * "ResourceNotFoundException"579            if '#' in code:580                code = code.rsplit('#', 1)[1]581            error['Error']['Code'] = code582        self._inject_response_metadata(error, response['headers'])583        return error584    def _inject_response_metadata(self, parsed, headers):585        if 'x-amzn-requestid' in headers:586            parsed.setdefault('ResponseMetadata', {})['RequestId'] = (587                headers['x-amzn-requestid'])588    def _parse_body_as_json(self, body_contents):589        if not body_contents:590            return {}591        body = body_contents.decode(self.DEFAULT_ENCODING)592        try:593            original_parsed = json.loads(body)594            return original_parsed595        except ValueError:596            # if the body cannot be parsed, include597            # the literal string as the message598            return {'message': body}599class BaseEventStreamParser(ResponseParser):600    def _do_parse(self, response, shape):601        final_parsed = {}602        if shape.serialization.get('eventstream'):603            event_type = response['headers'].get(':event-type')604            event_shape = shape.members.get(event_type)605            if event_shape:606                final_parsed[event_type] = self._do_parse(response, event_shape)607        else:608            self._parse_non_payload_attrs(response, shape,609                                          shape.members, final_parsed)610            self._parse_payload(response, shape, shape.members, final_parsed)611        return final_parsed612    def _do_error_parse(self, response, shape):613        exception_type = response['headers'].get(':exception-type')614        exception_shape = shape.members.get(exception_type)615        if exception_shape is not None:616            original_parsed = self._initial_body_parse(response['body'])617            body = self._parse_shape(exception_shape, original_parsed)618            error = {619                'Error': {620                    'Code': exception_type,621                    'Message': body.get('Message', body.get('message', ''))622                }623            }624        else:625            error = {626                'Error': {627                    'Code': response['headers'].get(':error-code', ''),628                    'Message': response['headers'].get(':error-message', ''),629                }630            }631        return error632    def _parse_payload(self, response, shape, member_shapes, final_parsed):633        if shape.serialization.get('event'):634            for name in member_shapes:635                member_shape = member_shapes[name]636                if member_shape.serialization.get('eventpayload'):637                    body = response['body']638                    if member_shape.type_name == 'blob':639                        parsed_body = body640                    elif member_shape.type_name == 'string':641                        parsed_body = body.decode(self.DEFAULT_ENCODING)642                    else:643                        raw_parse = self._initial_body_parse(body)644                        parsed_body = self._parse_shape(member_shape, raw_parse)645                    final_parsed[name] = parsed_body646                    return647            # If we didn't find an explicit payload, use the current shape648            original_parsed = self._initial_body_parse(response['body'])649            body_parsed = self._parse_shape(shape, original_parsed)650            final_parsed.update(body_parsed)651    def _parse_non_payload_attrs(self, response, shape,652                                 member_shapes, final_parsed):653        headers = response['headers']654        for name in member_shapes:655            member_shape = member_shapes[name]656            if member_shape.serialization.get('eventheader'):657                if name in headers:658                    value = headers[name]659                    if member_shape.type_name == 'timestamp':660                        # Event stream timestamps are an in milleseconds so we661                        # divide by 1000 to convert to seconds.662                        value = self._timestamp_parser(value / 1000.0)663                    final_parsed[name] = value664    def _initial_body_parse(self, body_contents):665        # This method should do the initial xml/json parsing of the666        # body.  We we still need to walk the parsed body in order667        # to convert types, but this method will do the first round668        # of parsing.669        raise NotImplementedError("_initial_body_parse")670class EventStreamJSONParser(BaseEventStreamParser, BaseJSONParser):671    def _initial_body_parse(self, body_contents):672        return self._parse_body_as_json(body_contents)673class EventStreamXMLParser(BaseEventStreamParser, BaseXMLResponseParser):674    def _initial_body_parse(self, xml_string):675        if not xml_string:676            return ETree.Element('')677        return self._parse_xml_string_to_dom(xml_string)678class JSONParser(BaseJSONParser):679    EVENT_STREAM_PARSER_CLS = EventStreamJSONParser680    """Response parser for the "json" protocol."""681    def _do_parse(self, response, shape):682        parsed = {}683        if shape is not None:684            event_name = shape.event_stream_name685            if event_name:686                parsed = self._handle_event_stream(response, shape, event_name)687            else:688                parsed = self._handle_json_body(response['body'], shape)689        self._inject_response_metadata(parsed, response['headers'])690        return parsed691    def _do_modeled_error_parse(self, response, shape):692        return self._handle_json_body(response['body'], shape)693    def _handle_event_stream(self, response, shape, event_name):694        event_stream_shape = shape.members[event_name]695        event_stream = self._create_event_stream(response, event_stream_shape)696        try:697            event = event_stream.get_initial_response()698        except NoInitialResponseError:699            error_msg = 'First event was not of type initial-response'700            raise ResponseParserError(error_msg)701        parsed = self._handle_json_body(event.payload, shape)702        parsed[event_name] = event_stream703        return parsed704    def _handle_json_body(self, raw_body, shape):705        # The json.loads() gives us the primitive JSON types,706        # but we need to traverse the parsed JSON data to convert707        # to richer types (blobs, timestamps, etc.708        parsed_json = self._parse_body_as_json(raw_body)709        return self._parse_shape(shape, parsed_json)710class BaseRestParser(ResponseParser):711    def _do_parse(self, response, shape):712        final_parsed = {}713        final_parsed['ResponseMetadata'] = self._populate_response_metadata(714            response)715        self._add_modeled_parse(response, shape, final_parsed)716        return final_parsed717    def _add_modeled_parse(self, response, shape, final_parsed):718        if shape is None:719            return final_parsed720        member_shapes = shape.members721        self._parse_non_payload_attrs(response, shape,722                                      member_shapes, final_parsed)723        self._parse_payload(response, shape, member_shapes, final_parsed)724    def _do_modeled_error_parse(self, response, shape):725        final_parsed = {}726        self._add_modeled_parse(response, shape, final_parsed)727        return final_parsed728    def _populate_response_metadata(self, response):729        metadata = {}730        headers = response['headers']731        if 'x-amzn-requestid' in headers:732            metadata['RequestId'] = headers['x-amzn-requestid']733        elif 'x-amz-request-id' in headers:734            metadata['RequestId'] = headers['x-amz-request-id']735            # HostId is what it's called whenever this value is returned736            # in an XML response body, so to be consistent, we'll always737            # call is HostId.738            metadata['HostId'] = headers.get('x-amz-id-2', '')739        return metadata740    def _parse_payload(self, response, shape, member_shapes, final_parsed):741        if 'payload' in shape.serialization:742            # If a payload is specified in the output shape, then only that743            # shape is used for the body payload.744            payload_member_name = shape.serialization['payload']745            body_shape = member_shapes[payload_member_name]746            if body_shape.serialization.get('eventstream'):747                body = self._create_event_stream(response, body_shape)748                final_parsed[payload_member_name] = body749            elif body_shape.type_name in ['string', 'blob']:750                # This is a stream751                body = response['body']752                if isinstance(body, bytes):753                    body = body.decode(self.DEFAULT_ENCODING)754                final_parsed[payload_member_name] = body755            else:756                original_parsed = self._initial_body_parse(response['body'])757                final_parsed[payload_member_name] = self._parse_shape(758                    body_shape, original_parsed)759        else:760            original_parsed = self._initial_body_parse(response['body'])761            body_parsed = self._parse_shape(shape, original_parsed)762            final_parsed.update(body_parsed)763    def _parse_non_payload_attrs(self, response, shape,764                                 member_shapes, final_parsed):765        headers = response['headers']766        for name in member_shapes:767            member_shape = member_shapes[name]768            location = member_shape.serialization.get('location')769            if location is None:770                continue771            elif location == 'statusCode':772                final_parsed[name] = self._parse_shape(773                    member_shape, response['status_code'])774            elif location == 'headers':775                final_parsed[name] = self._parse_header_map(member_shape,776                                                            headers)777            elif location == 'header':778                header_name = member_shape.serialization.get('name', name)779                if header_name in headers:780                    final_parsed[name] = self._parse_shape(781                        member_shape, headers[header_name])782    def _parse_header_map(self, shape, headers):783        # Note that headers are case insensitive, so we .lower()784        # all header names and header prefixes.785        parsed = {}786        prefix = shape.serialization.get('name', '').lower()787        for header_name in headers:788            if header_name.lower().startswith(prefix):789                # The key name inserted into the parsed hash790                # strips off the prefix.791                name = header_name[len(prefix):]792                parsed[name] = headers[header_name]793        return parsed794    def _initial_body_parse(self, body_contents):795        # This method should do the initial xml/json parsing of the796        # body.  We we still need to walk the parsed body in order797        # to convert types, but this method will do the first round798        # of parsing.799        raise NotImplementedError("_initial_body_parse")800    def _handle_string(self, shape, value):801        parsed = value802        if is_json_value_header(shape):803            decoded = base64.b64decode(value).decode(self.DEFAULT_ENCODING)804            parsed = json.loads(decoded)805        return parsed806    def _handle_list(self, shape, node):807        location = shape.serialization.get('location')808        if location == 'header' and not isinstance(node, list):809            # List in headers may be a comma separated string as per RFC7230810            node = [e.strip() for e in node.split(',')]811        return super(BaseRestParser, self)._handle_list(shape, node)812class RestJSONParser(BaseRestParser, BaseJSONParser):813    EVENT_STREAM_PARSER_CLS = EventStreamJSONParser814    def _initial_body_parse(self, body_contents):815        return self._parse_body_as_json(body_contents)816    def _do_error_parse(self, response, shape):817        error = super(RestJSONParser, self)._do_error_parse(response, shape)818        self._inject_error_code(error, response)819        return error820    def _inject_error_code(self, error, response):821        # The "Code" value can come from either a response822        # header or a value in the JSON body.823        body = self._initial_body_parse(response['body'])824        if 'x-amzn-errortype' in response['headers']:825            code = response['headers']['x-amzn-errortype']826            # Could be:827            # x-amzn-errortype: ValidationException:828            code = code.split(':')[0]829            error['Error']['Code'] = code830        elif 'code' in body or 'Code' in body:831            error['Error']['Code'] = body.get(832                'code', body.get('Code', ''))833class RestXMLParser(BaseRestParser, BaseXMLResponseParser):834    EVENT_STREAM_PARSER_CLS = EventStreamXMLParser835    def _initial_body_parse(self, xml_string):836        if not xml_string:837            return ETree.Element('')838        return self._parse_xml_string_to_dom(xml_string)839    def _do_error_parse(self, response, shape):840        # We're trying to be service agnostic here, but S3 does have a slightly841        # different response structure for its errors compared to other842        # rest-xml serivces (route53/cloudfront).  We handle this by just843        # trying to parse both forms.844        # First:845        # <ErrorResponse xmlns="...">846        #   <Error>847        #     <Type>Sender</Type>848        #     <Code>InvalidInput</Code>849        #     <Message>Invalid resource type: foo</Message>850        #   </Error>851        #   <RequestId>request-id</RequestId>852        # </ErrorResponse>853        if response['body']:854            # If the body ends up being invalid xml, the xml parser should not855            # blow up. It should at least try to pull information about the856            # the error response from other sources like the HTTP status code.857            try:858                return self._parse_error_from_body(response)859            except ResponseParserError:860                LOG.debug(861                    'Exception caught when parsing error response body:',862                    exc_info=True)863        return self._parse_error_from_http_status(response)864    def _parse_error_from_http_status(self, response):865        return {866            'Error': {867                'Code': str(response['status_code']),868                'Message': six.moves.http_client.responses.get(869                    response['status_code'], ''),870            },871            'ResponseMetadata': {872                'RequestId': response['headers'].get('x-amz-request-id', ''),873                'HostId': response['headers'].get('x-amz-id-2', ''),874            }875        }876    def _parse_error_from_body(self, response):877        xml_contents = response['body']878        root = self._parse_xml_string_to_dom(xml_contents)879        parsed = self._build_name_to_xml_node(root)880        self._replace_nodes(parsed)881        if root.tag == 'Error':882            # This is an S3 error response.  First we'll populate the883            # response metadata.884            metadata = self._populate_response_metadata(response)885            # The RequestId and the HostId are already in the886            # ResponseMetadata, but are also duplicated in the XML887            # body.  We don't need these values in both places,888            # we'll just remove them from the parsed XML body.889            parsed.pop('RequestId', '')890            parsed.pop('HostId', '')891            return {'Error': parsed, 'ResponseMetadata': metadata}892        elif 'RequestId' in parsed:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
