Best Python code snippet using localstack_python
parsers.py
Source:parsers.py  
...556        return self._blob_parser(value)557    def _handle_timestamp(self, shape, value):558        return self._timestamp_parser(value)559    def _do_error_parse(self, response, shape):560        body = self._parse_body_as_json(response['body'])561        error = {"Error": {"Message": '', "Code": ''}, "ResponseMetadata": {}}562        # Error responses can have slightly different structures for json.563        # The basic structure is:564        #565        # {"__type":"ConnectClientException",566        #  "message":"The error message."}567        # The error message can either come in the 'message' or 'Message' key568        # so we need to check for both.569        error['Error']['Message'] = body.get('message',570                                             body.get('Message', ''))571        # if the message did not contain an error code572        # include the response status code573        response_code = response.get('status_code')574        code = body.get('__type', response_code and str(response_code))575        if code is not None:576            # code has a couple forms as well:577            # * "com.aws.dynamodb.vAPI#ProvisionedThroughputExceededException"578            # * "ResourceNotFoundException"579            if '#' in code:580                code = code.rsplit('#', 1)[1]581            error['Error']['Code'] = code582        self._inject_response_metadata(error, response['headers'])583        return error584    def _inject_response_metadata(self, parsed, headers):585        if 'x-amzn-requestid' in headers:586            parsed.setdefault('ResponseMetadata', {})['RequestId'] = (587                headers['x-amzn-requestid'])588    def _parse_body_as_json(self, body_contents):589        if not body_contents:590            return {}591        body = body_contents.decode(self.DEFAULT_ENCODING)592        try:593            original_parsed = json.loads(body)594            return original_parsed595        except ValueError:596            # if the body cannot be parsed, include597            # the literal string as the message598            return {'message': body}599class BaseEventStreamParser(ResponseParser):600    def _do_parse(self, response, shape):601        final_parsed = {}602        if shape.serialization.get('eventstream'):603            event_type = response['headers'].get(':event-type')604            event_shape = shape.members.get(event_type)605            if event_shape:606                final_parsed[event_type] = self._do_parse(response, event_shape)607        else:608            self._parse_non_payload_attrs(response, shape,609                                          shape.members, final_parsed)610            self._parse_payload(response, shape, shape.members, final_parsed)611        return final_parsed612    def _do_error_parse(self, response, shape):613        exception_type = response['headers'].get(':exception-type')614        exception_shape = shape.members.get(exception_type)615        if exception_shape is not None:616            original_parsed = self._initial_body_parse(response['body'])617            body = self._parse_shape(exception_shape, original_parsed)618            error = {619                'Error': {620                    'Code': exception_type,621                    'Message': body.get('Message', body.get('message', ''))622                }623            }624        else:625            error = {626                'Error': {627                    'Code': response['headers'].get(':error-code', ''),628                    'Message': response['headers'].get(':error-message', ''),629                }630            }631        return error632    def _parse_payload(self, response, shape, member_shapes, final_parsed):633        if shape.serialization.get('event'):634            for name in member_shapes:635                member_shape = member_shapes[name]636                if member_shape.serialization.get('eventpayload'):637                    body = response['body']638                    if member_shape.type_name == 'blob':639                        parsed_body = body640                    elif member_shape.type_name == 'string':641                        parsed_body = body.decode(self.DEFAULT_ENCODING)642                    else:643                        raw_parse = self._initial_body_parse(body)644                        parsed_body = self._parse_shape(member_shape, raw_parse)645                    final_parsed[name] = parsed_body646                    return647            # If we didn't find an explicit payload, use the current shape648            original_parsed = self._initial_body_parse(response['body'])649            body_parsed = self._parse_shape(shape, original_parsed)650            final_parsed.update(body_parsed)651    def _parse_non_payload_attrs(self, response, shape,652                                 member_shapes, final_parsed):653        headers = response['headers']654        for name in member_shapes:655            member_shape = member_shapes[name]656            if member_shape.serialization.get('eventheader'):657                if name in headers:658                    value = headers[name]659                    if member_shape.type_name == 'timestamp':660                        # Event stream timestamps are an in milleseconds so we661                        # divide by 1000 to convert to seconds.662                        value = self._timestamp_parser(value / 1000.0)663                    final_parsed[name] = value664    def _initial_body_parse(self, body_contents):665        # This method should do the initial xml/json parsing of the666        # body.  We we still need to walk the parsed body in order667        # to convert types, but this method will do the first round668        # of parsing.669        raise NotImplementedError("_initial_body_parse")670class EventStreamJSONParser(BaseEventStreamParser, BaseJSONParser):671    def _initial_body_parse(self, body_contents):672        return self._parse_body_as_json(body_contents)673class EventStreamXMLParser(BaseEventStreamParser, BaseXMLResponseParser):674    def _initial_body_parse(self, xml_string):675        if not xml_string:676            return ETree.Element('')677        return self._parse_xml_string_to_dom(xml_string)678class JSONParser(BaseJSONParser):679    EVENT_STREAM_PARSER_CLS = EventStreamJSONParser680    """Response parser for the "json" protocol."""681    def _do_parse(self, response, shape):682        parsed = {}683        if shape is not None:684            event_name = shape.event_stream_name685            if event_name:686                parsed = self._handle_event_stream(response, shape, event_name)687            else:688                parsed = self._handle_json_body(response['body'], shape)689        self._inject_response_metadata(parsed, response['headers'])690        return parsed691    def _do_modeled_error_parse(self, response, shape):692        return self._handle_json_body(response['body'], shape)693    def _handle_event_stream(self, response, shape, event_name):694        event_stream_shape = shape.members[event_name]695        event_stream = self._create_event_stream(response, event_stream_shape)696        try:697            event = event_stream.get_initial_response()698        except NoInitialResponseError:699            error_msg = 'First event was not of type initial-response'700            raise ResponseParserError(error_msg)701        parsed = self._handle_json_body(event.payload, shape)702        parsed[event_name] = event_stream703        return parsed704    def _handle_json_body(self, raw_body, shape):705        # The json.loads() gives us the primitive JSON types,706        # but we need to traverse the parsed JSON data to convert707        # to richer types (blobs, timestamps, etc.708        parsed_json = self._parse_body_as_json(raw_body)709        return self._parse_shape(shape, parsed_json)710class BaseRestParser(ResponseParser):711    def _do_parse(self, response, shape):712        final_parsed = {}713        final_parsed['ResponseMetadata'] = self._populate_response_metadata(714            response)715        self._add_modeled_parse(response, shape, final_parsed)716        return final_parsed717    def _add_modeled_parse(self, response, shape, final_parsed):718        if shape is None:719            return final_parsed720        member_shapes = shape.members721        self._parse_non_payload_attrs(response, shape,722                                      member_shapes, final_parsed)723        self._parse_payload(response, shape, member_shapes, final_parsed)724    def _do_modeled_error_parse(self, response, shape):725        final_parsed = {}726        self._add_modeled_parse(response, shape, final_parsed)727        return final_parsed728    def _populate_response_metadata(self, response):729        metadata = {}730        headers = response['headers']731        if 'x-amzn-requestid' in headers:732            metadata['RequestId'] = headers['x-amzn-requestid']733        elif 'x-amz-request-id' in headers:734            metadata['RequestId'] = headers['x-amz-request-id']735            # HostId is what it's called whenever this value is returned736            # in an XML response body, so to be consistent, we'll always737            # call is HostId.738            metadata['HostId'] = headers.get('x-amz-id-2', '')739        return metadata740    def _parse_payload(self, response, shape, member_shapes, final_parsed):741        if 'payload' in shape.serialization:742            # If a payload is specified in the output shape, then only that743            # shape is used for the body payload.744            payload_member_name = shape.serialization['payload']745            body_shape = member_shapes[payload_member_name]746            if body_shape.serialization.get('eventstream'):747                body = self._create_event_stream(response, body_shape)748                final_parsed[payload_member_name] = body749            elif body_shape.type_name in ['string', 'blob']:750                # This is a stream751                body = response['body']752                if isinstance(body, bytes):753                    body = body.decode(self.DEFAULT_ENCODING)754                final_parsed[payload_member_name] = body755            else:756                original_parsed = self._initial_body_parse(response['body'])757                final_parsed[payload_member_name] = self._parse_shape(758                    body_shape, original_parsed)759        else:760            original_parsed = self._initial_body_parse(response['body'])761            body_parsed = self._parse_shape(shape, original_parsed)762            final_parsed.update(body_parsed)763    def _parse_non_payload_attrs(self, response, shape,764                                 member_shapes, final_parsed):765        headers = response['headers']766        for name in member_shapes:767            member_shape = member_shapes[name]768            location = member_shape.serialization.get('location')769            if location is None:770                continue771            elif location == 'statusCode':772                final_parsed[name] = self._parse_shape(773                    member_shape, response['status_code'])774            elif location == 'headers':775                final_parsed[name] = self._parse_header_map(member_shape,776                                                            headers)777            elif location == 'header':778                header_name = member_shape.serialization.get('name', name)779                if header_name in headers:780                    final_parsed[name] = self._parse_shape(781                        member_shape, headers[header_name])782    def _parse_header_map(self, shape, headers):783        # Note that headers are case insensitive, so we .lower()784        # all header names and header prefixes.785        parsed = {}786        prefix = shape.serialization.get('name', '').lower()787        for header_name in headers:788            if header_name.lower().startswith(prefix):789                # The key name inserted into the parsed hash790                # strips off the prefix.791                name = header_name[len(prefix):]792                parsed[name] = headers[header_name]793        return parsed794    def _initial_body_parse(self, body_contents):795        # This method should do the initial xml/json parsing of the796        # body.  We we still need to walk the parsed body in order797        # to convert types, but this method will do the first round798        # of parsing.799        raise NotImplementedError("_initial_body_parse")800    def _handle_string(self, shape, value):801        parsed = value802        if is_json_value_header(shape):803            decoded = base64.b64decode(value).decode(self.DEFAULT_ENCODING)804            parsed = json.loads(decoded)805        return parsed806    def _handle_list(self, shape, node):807        location = shape.serialization.get('location')808        if location == 'header' and not isinstance(node, list):809            # List in headers may be a comma separated string as per RFC7230810            node = [e.strip() for e in node.split(',')]811        return super(BaseRestParser, self)._handle_list(shape, node)812class RestJSONParser(BaseRestParser, BaseJSONParser):813    EVENT_STREAM_PARSER_CLS = EventStreamJSONParser814    def _initial_body_parse(self, body_contents):815        return self._parse_body_as_json(body_contents)816    def _do_error_parse(self, response, shape):817        error = super(RestJSONParser, self)._do_error_parse(response, shape)818        self._inject_error_code(error, response)819        return error820    def _inject_error_code(self, error, response):821        # The "Code" value can come from either a response822        # header or a value in the JSON body.823        body = self._initial_body_parse(response['body'])824        if 'x-amzn-errortype' in response['headers']:825            code = response['headers']['x-amzn-errortype']826            # Could be:827            # x-amzn-errortype: ValidationException:828            code = code.split(':')[0]829            error['Error']['Code'] = code...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
