Best Python code snippet using localstack_python
parsers.py
Source:parsers.py  
...251        return super(BaseXMLResponseParser, self)._handle_list(shape, node)252    def _handle_structure(self, shape, node):253        parsed = {}254        members = shape.members255        xml_dict = self._build_name_to_xml_node(node)256        for member_name in members:257            member_shape = members[member_name]258            if 'location' in member_shape.serialization:259                # All members with locations have already been handled,260                # so we don't need to parse these members.261                continue262            xml_name = self._member_key_name(member_shape, member_name)263            member_node = xml_dict.get(xml_name)264            if member_node is not None:265                parsed[member_name] = self._parse_shape(266                    member_shape, member_node)267            elif member_shape.serialization.get('xmlAttribute'):268                attribs = {}269                location_name = member_shape.serialization['name']270                for key, value in node.attrib.items():271                    new_key = self._namespace_re.sub(272                        location_name.split(':')[0] + ':', key)273                    attribs[new_key] = value274                if location_name in attribs:275                    parsed[member_name] = attribs[location_name]276        return parsed277    def _member_key_name(self, shape, member_name):278        # This method is needed because we have to special case flattened list279        # with a serialization name.  If this is the case we use the280        # locationName from the list's member shape as the key name for the281        # surrounding structure.282        if shape.type_name == 'list' and shape.serialization.get('flattened'):283            list_member_serialized_name = shape.member.serialization.get(284                'name')285            if list_member_serialized_name is not None:286                return list_member_serialized_name287        serialized_name = shape.serialization.get('name')288        if serialized_name is not None:289            return serialized_name290        return member_name291    def _build_name_to_xml_node(self, parent_node):292        # If the parent node is actually a list. We should not be trying293        # to serialize it to a dictionary. Instead, return the first element294        # in the list.295        if isinstance(parent_node, list):296            return self._build_name_to_xml_node(parent_node[0])297        xml_dict = {}298        for item in parent_node:299            key = self._node_tag(item)300            if key in xml_dict:301                # If the key already exists, the most natural302                # way to handle this is to aggregate repeated303                # keys into a single list.304                # <foo>1</foo><foo>2</foo> -> {'foo': [Node(1), Node(2)]}305                if isinstance(xml_dict[key], list):306                    xml_dict[key].append(item)307                else:308                    # Convert from a scalar to a list.309                    xml_dict[key] = [xml_dict[key], item]310            else:311                xml_dict[key] = item312        return xml_dict313    def _parse_xml_string_to_dom(self, xml_string):314        try:315            parser = xml.etree.cElementTree.XMLParser(316                target=xml.etree.cElementTree.TreeBuilder(),317                encoding=self.DEFAULT_ENCODING)318            parser.feed(xml_string)319            root = parser.close()320        except XMLParseError as e:321            raise ResponseParserError(322                "Unable to parse response (%s), "323                "invalid XML received:\n%s" % (e, xml_string))324        return root325    def _replace_nodes(self, parsed):326        for key, value in parsed.items():327            if value.getchildren():328                sub_dict = self._build_name_to_xml_node(value)329                parsed[key] = self._replace_nodes(sub_dict)330            else:331                parsed[key] = value.text332        return parsed333    @_text_content334    def _handle_boolean(self, shape, text):335        if text == 'true':336            return True337        else:338            return False339    @_text_content340    def _handle_float(self, shape, text):341        return float(text)342    @_text_content343    def _handle_timestamp(self, shape, text):344        return self._timestamp_parser(text)345    @_text_content346    def _handle_integer(self, shape, text):347        return int(text)348    @_text_content349    def _handle_string(self, shape, text):350        return text351    @_text_content352    def _handle_blob(self, shape, text):353        return self._blob_parser(text)354    _handle_character = _handle_string355    _handle_double = _handle_float356    _handle_long = _handle_integer357class QueryParser(BaseXMLResponseParser):358    def _do_error_parse(self, response, shape):359        xml_contents = response['body']360        root = self._parse_xml_string_to_dom(xml_contents)361        parsed = self._build_name_to_xml_node(root)362        self._replace_nodes(parsed)363        # Once we've converted xml->dict, we need to make one or two364        # more adjustments to extract nested errors and to be consistent365        # with ResponseMetadata for non-error responses:366        # 1. {"Errors": {"Error": {...}}} -> {"Error": {...}}367        # 2. {"RequestId": "id"} -> {"ResponseMetadata": {"RequestId": "id"}}368        if 'Errors' in parsed:369            parsed.update(parsed.pop('Errors'))370        if 'RequestId' in parsed:371            parsed['ResponseMetadata'] = {'RequestId': parsed.pop('RequestId')}372        return parsed373    def _do_parse(self, response, shape):374        xml_contents = response['body']375        root = self._parse_xml_string_to_dom(xml_contents)376        parsed = {}377        if shape is not None:378            start = root379            if 'resultWrapper' in shape.serialization:380                start = self._find_result_wrapped_shape(381                    shape.serialization['resultWrapper'],382                    root)383            parsed = self._parse_shape(shape, start)384        self._inject_response_metadata(root, parsed)385        return parsed386    def _find_result_wrapped_shape(self, element_name, xml_root_node):387        mapping = self._build_name_to_xml_node(xml_root_node)388        return mapping[element_name]389    def _inject_response_metadata(self, node, inject_into):390        mapping = self._build_name_to_xml_node(node)391        child_node = mapping.get('ResponseMetadata')392        if child_node is not None:393            sub_mapping = self._build_name_to_xml_node(child_node)394            for key, value in sub_mapping.items():395                sub_mapping[key] = value.text396            inject_into['ResponseMetadata'] = sub_mapping397class EC2QueryParser(QueryParser):398    def _inject_response_metadata(self, node, inject_into):399        mapping = self._build_name_to_xml_node(node)400        child_node = mapping.get('requestId')401        if child_node is not None:402            inject_into['ResponseMetadata'] = {'RequestId': child_node.text}403    def _do_error_parse(self, response, shape):404        # EC2 errors look like:405        # <Response>406        #   <Errors>407        #     <Error>408        #       <Code>InvalidInstanceID.Malformed</Code>409        #       <Message>Invalid id: "1343124"</Message>410        #     </Error>411        #   </Errors>412        #   <RequestID>12345</RequestID>413        # </Response>414        # This is different from QueryParser in that it's RequestID,415        # not RequestId416        original = super(EC2QueryParser, self)._do_error_parse(response, shape)417        original['ResponseMetadata'] = {418            'RequestId': original.pop('RequestID')419        }420        return original421class BaseJSONParser(ResponseParser):422    def _handle_structure(self, shape, value):423        member_shapes = shape.members424        if value is None:425            # If the comes across the wire as "null" (None in python),426            # we should be returning this unchanged, instead of as an427            # empty dict.428            return None429        final_parsed = {}430        for member_name in member_shapes:431            member_shape = member_shapes[member_name]432            json_name = member_shape.serialization.get('name', member_name)433            raw_value = value.get(json_name)434            if raw_value is not None:435                final_parsed[member_name] = self._parse_shape(436                    member_shapes[member_name],437                    raw_value)438        return final_parsed439    def _handle_map(self, shape, value):440        parsed = {}441        key_shape = shape.key442        value_shape = shape.value443        for key, value in value.items():444            actual_key = self._parse_shape(key_shape, key)445            actual_value = self._parse_shape(value_shape, value)446            parsed[actual_key] = actual_value447        return parsed448    def _handle_blob(self, shape, value):449        return self._blob_parser(value)450    def _handle_timestamp(self, shape, value):451        return self._timestamp_parser(value)452    def _do_error_parse(self, response, shape):453        body = self._parse_body_as_json(response['body'])454        error = {"Error": {"Message": '', "Code": ''}, "ResponseMetadata": {}}455        # Error responses can have slightly different structures for json.456        # The basic structure is:457        #458        # {"__type":"ConnectClientException",459        #  "message":"The error message."}460        # The error message can either come in the 'message' or 'Message' key461        # so we need to check for both.462        error['Error']['Message'] = body.get('message',463                                             body.get('Message', ''))464        code = body.get('__type')465        if code is not None:466            # code has a couple forms as well:467            # * "com.aws.dynamodb.vAPI#ProvisionedThroughputExceededException"468            # * "ResourceNotFoundException"469            if '#' in code:470                code = code.rsplit('#', 1)[1]471            error['Error']['Code'] = code472        self._inject_response_metadata(error, response['headers'])473        return error474    def _inject_response_metadata(self, parsed, headers):475        if 'x-amzn-requestid' in headers:476            parsed.setdefault('ResponseMetadata', {})['RequestId'] = (477                headers['x-amzn-requestid'])478    def _parse_body_as_json(self, body_contents):479        if not body_contents:480            return {}481        body = body_contents.decode(self.DEFAULT_ENCODING)482        original_parsed = json.loads(body)483        return original_parsed484class JSONParser(BaseJSONParser):485    """Response parse for the "json" protocol."""486    def _do_parse(self, response, shape):487        # The json.loads() gives us the primitive JSON types,488        # but we need to traverse the parsed JSON data to convert489        # to richer types (blobs, timestamps, etc.490        parsed = {}491        if shape is not None:492            original_parsed = self._parse_body_as_json(response['body'])493            parsed = self._parse_shape(shape, original_parsed)494        self._inject_response_metadata(parsed, response['headers'])495        return parsed496class BaseRestParser(ResponseParser):497    def _do_parse(self, response, shape):498        final_parsed = {}499        final_parsed['ResponseMetadata'] = self._populate_response_metadata(500            response)501        if shape is None:502            return final_parsed503        member_shapes = shape.members504        self._parse_non_payload_attrs(response, shape,505                                      member_shapes, final_parsed)506        self._parse_payload(response, shape, member_shapes, final_parsed)507        return final_parsed508    def _populate_response_metadata(self, response):509        metadata = {}510        headers = response['headers']511        if 'x-amzn-requestid' in headers:512            metadata['RequestId'] = headers['x-amzn-requestid']513        elif 'x-amz-request-id' in headers:514            metadata['RequestId'] = headers['x-amz-request-id']515            # HostId is what it's called whenver this value is returned516            # in an XML response body, so to be consistent, we'll always517            # call is HostId.518            metadata['HostId'] = headers.get('x-amz-id-2', '')519        return metadata520    def _parse_payload(self, response, shape, member_shapes, final_parsed):521        if 'payload' in shape.serialization:522            # If a payload is specified in the output shape, then only that523            # shape is used for the body payload.524            payload_member_name = shape.serialization['payload']525            body_shape = member_shapes[payload_member_name]526            if body_shape.type_name in ['string', 'blob']:527                # This is a stream528                body = response['body']529                if isinstance(body, bytes):530                    body = body.decode(self.DEFAULT_ENCODING)531                final_parsed[payload_member_name] = body532            else:533                original_parsed = self._initial_body_parse(response['body'])534                final_parsed[payload_member_name] = self._parse_shape(535                    body_shape, original_parsed)536        else:537            original_parsed = self._initial_body_parse(response['body'])538            body_parsed = self._parse_shape(shape, original_parsed)539            final_parsed.update(body_parsed)540    def _parse_non_payload_attrs(self, response, shape,541                                 member_shapes, final_parsed):542        headers = response['headers']543        for name in member_shapes:544            member_shape = member_shapes[name]545            location = member_shape.serialization.get('location')546            if location is None:547                continue548            elif location == 'statusCode':549                final_parsed[name] = self._parse_shape(550                    member_shape, response['status_code'])551            elif location == 'headers':552                final_parsed[name] = self._parse_header_map(member_shape,553                                                            headers)554            elif location == 'header':555                header_name = member_shape.serialization.get('name', name)556                if header_name in headers:557                    final_parsed[name] = self._parse_shape(558                        member_shape, headers[header_name])559    def _parse_header_map(self, shape, headers):560        # Note that headers are case insensitive, so we .lower()561        # all header names and header prefixes.562        parsed = {}563        prefix = shape.serialization.get('name', '').lower()564        for header_name in headers:565            if header_name.lower().startswith(prefix):566                # The key name inserted into the parsed hash567                # strips off the prefix.568                name = header_name[len(prefix):]569                parsed[name] = headers[header_name]570        return parsed571    def _initial_body_parse(self, body_contents):572        # This method should do the initial xml/json parsing of the573        # body.  We we still need to walk the parsed body in order574        # to convert types, but this method will do the first round575        # of parsing.576        raise NotImplementedError("_initial_body_parse")577class RestJSONParser(BaseRestParser, BaseJSONParser):578    def _initial_body_parse(self, body_contents):579        return self._parse_body_as_json(body_contents)580    def _do_error_parse(self, response, shape):581        error = super(RestJSONParser, self)._do_error_parse(response, shape)582        self._inject_error_code(error, response)583        return error584    def _inject_error_code(self, error, response):585        # The "Code" value can come from either a response586        # header or a value in the JSON body.587        body = self._initial_body_parse(response['body'])588        if 'x-amzn-errortype' in response['headers']:589            code = response['headers']['x-amzn-errortype']590            # Could be:591            # x-amzn-errortype: ValidationException:592            code = code.split(':')[0]593            error['Error']['Code'] = code594        elif 'code' in body or 'Code' in body:595            error['Error']['Code'] = body.get(596                'code', body.get('Code', ''))597class RestXMLParser(BaseRestParser, BaseXMLResponseParser):598    def _initial_body_parse(self, xml_string):599        if not xml_string:600            return xml.etree.cElementTree.Element('')601        return self._parse_xml_string_to_dom(xml_string)602    def _do_error_parse(self, response, shape):603        # We're trying to be service agnostic here, but S3 does have a slightly604        # different response structure for its errors compared to other605        # rest-xml serivces (route53/cloudfront).  We handle this by just606        # trying to parse both forms.607        # First:608        # <ErrorResponse xmlns="...">609        #   <Error>610        #     <Type>Sender</Type>611        #     <Code>InvalidInput</Code>612        #     <Message>Invalid resource type: foo</Message>613        #   </Error>614        #   <RequestId>request-id</RequestId>615        # </ErrorResponse>616        if response['body']:617            # If the body ends up being invalid xml, the xml parser should not618            # blow up. It should at least try to pull information about the619            # the error response from other sources like the HTTP status code.620            try:621                return self._parse_error_from_body(response)622            except ResponseParserError as e:623                LOG.debug(624                    'Exception caught when parsing error response body:',625                    exc_info=True)626        return self._parse_error_from_http_status(response)627    def _parse_error_from_http_status(self, response):628        return {629            'Error': {630                'Code': str(response['status_code']),631                'Message': six.moves.http_client.responses.get(632                    response['status_code'], ''),633            },634            'ResponseMetadata': {635                'RequestId': response['headers'].get('x-amz-request-id', ''),636                'HostId': response['headers'].get('x-amz-id-2', ''),637            }638        }639    def _parse_error_from_body(self, response):640        xml_contents = response['body']641        root = self._parse_xml_string_to_dom(xml_contents)642        parsed = self._build_name_to_xml_node(root)643        self._replace_nodes(parsed)644        if root.tag == 'Error':645            # This is an S3 error response.  First we'll populate the646            # response metadata.647            metadata = self._populate_response_metadata(response)648            # The RequestId and the HostId are already in the649            # ResponseMetadata, but are also duplicated in the XML650            # body.  We don't need these values in both places,651            # we'll just remove them from the parsed XML body.652            parsed.pop('RequestId', '')653            parsed.pop('HostId', '')654            return {'Error': parsed, 'ResponseMetadata': metadata}655        elif 'RequestId' in parsed:656            # Other rest-xml serivces:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
