How to use _build_name_to_xml_node method in localstack

Best Python code snippet using localstack_python

parsers.py

Source:parsers.py Github

copy

Full Screen

...251 return super(BaseXMLResponseParser, self)._handle_list(shape, node)252 def _handle_structure(self, shape, node):253 parsed = {}254 members = shape.members255 xml_dict = self._build_name_to_xml_node(node)256 for member_name in members:257 member_shape = members[member_name]258 if 'location' in member_shape.serialization:259 # All members with locations have already been handled,260 # so we don't need to parse these members.261 continue262 xml_name = self._member_key_name(member_shape, member_name)263 member_node = xml_dict.get(xml_name)264 if member_node is not None:265 parsed[member_name] = self._parse_shape(266 member_shape, member_node)267 elif member_shape.serialization.get('xmlAttribute'):268 attribs = {}269 location_name = member_shape.serialization['name']270 for key, value in node.attrib.items():271 new_key = self._namespace_re.sub(272 location_name.split(':')[0] + ':', key)273 attribs[new_key] = value274 if location_name in attribs:275 parsed[member_name] = attribs[location_name]276 return parsed277 def _member_key_name(self, shape, member_name):278 # This method is needed because we have to special case flattened list279 # with a serialization name. If this is the case we use the280 # locationName from the list's member shape as the key name for the281 # surrounding structure.282 if shape.type_name == 'list' and shape.serialization.get('flattened'):283 list_member_serialized_name = shape.member.serialization.get(284 'name')285 if list_member_serialized_name is not None:286 return list_member_serialized_name287 serialized_name = shape.serialization.get('name')288 if serialized_name is not None:289 return serialized_name290 return member_name291 def _build_name_to_xml_node(self, parent_node):292 # If the parent node is actually a list. We should not be trying293 # to serialize it to a dictionary. Instead, return the first element294 # in the list.295 if isinstance(parent_node, list):296 return self._build_name_to_xml_node(parent_node[0])297 xml_dict = {}298 for item in parent_node:299 key = self._node_tag(item)300 if key in xml_dict:301 # If the key already exists, the most natural302 # way to handle this is to aggregate repeated303 # keys into a single list.304 # <foo>1</foo><foo>2</foo> -> {'foo': [Node(1), Node(2)]}305 if isinstance(xml_dict[key], list):306 xml_dict[key].append(item)307 else:308 # Convert from a scalar to a list.309 xml_dict[key] = [xml_dict[key], item]310 else:311 xml_dict[key] = item312 return xml_dict313 def _parse_xml_string_to_dom(self, xml_string):314 try:315 parser = xml.etree.cElementTree.XMLParser(316 target=xml.etree.cElementTree.TreeBuilder(),317 encoding=self.DEFAULT_ENCODING)318 parser.feed(xml_string)319 root = parser.close()320 except XMLParseError as e:321 raise ResponseParserError(322 "Unable to parse response (%s), "323 "invalid XML received:\n%s" % (e, xml_string))324 return root325 def _replace_nodes(self, parsed):326 for key, value in parsed.items():327 if value.getchildren():328 sub_dict = self._build_name_to_xml_node(value)329 parsed[key] = self._replace_nodes(sub_dict)330 else:331 parsed[key] = value.text332 return parsed333 @_text_content334 def _handle_boolean(self, shape, text):335 if text == 'true':336 return True337 else:338 return False339 @_text_content340 def _handle_float(self, shape, text):341 return float(text)342 @_text_content343 def _handle_timestamp(self, shape, text):344 return self._timestamp_parser(text)345 @_text_content346 def _handle_integer(self, shape, text):347 return int(text)348 @_text_content349 def _handle_string(self, shape, text):350 return text351 @_text_content352 def _handle_blob(self, shape, text):353 return self._blob_parser(text)354 _handle_character = _handle_string355 _handle_double = _handle_float356 _handle_long = _handle_integer357class QueryParser(BaseXMLResponseParser):358 def _do_error_parse(self, response, shape):359 xml_contents = response['body']360 root = self._parse_xml_string_to_dom(xml_contents)361 parsed = self._build_name_to_xml_node(root)362 self._replace_nodes(parsed)363 # Once we've converted xml->dict, we need to make one or two364 # more adjustments to extract nested errors and to be consistent365 # with ResponseMetadata for non-error responses:366 # 1. {"Errors": {"Error": {...}}} -> {"Error": {...}}367 # 2. {"RequestId": "id"} -> {"ResponseMetadata": {"RequestId": "id"}}368 if 'Errors' in parsed:369 parsed.update(parsed.pop('Errors'))370 if 'RequestId' in parsed:371 parsed['ResponseMetadata'] = {'RequestId': parsed.pop('RequestId')}372 return parsed373 def _do_parse(self, response, shape):374 xml_contents = response['body']375 root = self._parse_xml_string_to_dom(xml_contents)376 parsed = {}377 if shape is not None:378 start = root379 if 'resultWrapper' in shape.serialization:380 start = self._find_result_wrapped_shape(381 shape.serialization['resultWrapper'],382 root)383 parsed = self._parse_shape(shape, start)384 self._inject_response_metadata(root, parsed)385 return parsed386 def _find_result_wrapped_shape(self, element_name, xml_root_node):387 mapping = self._build_name_to_xml_node(xml_root_node)388 return mapping[element_name]389 def _inject_response_metadata(self, node, inject_into):390 mapping = self._build_name_to_xml_node(node)391 child_node = mapping.get('ResponseMetadata')392 if child_node is not None:393 sub_mapping = self._build_name_to_xml_node(child_node)394 for key, value in sub_mapping.items():395 sub_mapping[key] = value.text396 inject_into['ResponseMetadata'] = sub_mapping397class EC2QueryParser(QueryParser):398 def _inject_response_metadata(self, node, inject_into):399 mapping = self._build_name_to_xml_node(node)400 child_node = mapping.get('requestId')401 if child_node is not None:402 inject_into['ResponseMetadata'] = {'RequestId': child_node.text}403 def _do_error_parse(self, response, shape):404 # EC2 errors look like:405 # <Response>406 # <Errors>407 # <Error>408 # <Code>InvalidInstanceID.Malformed</Code>409 # <Message>Invalid id: "1343124"</Message>410 # </Error>411 # </Errors>412 # <RequestID>12345</RequestID>413 # </Response>414 # This is different from QueryParser in that it's RequestID,415 # not RequestId416 original = super(EC2QueryParser, self)._do_error_parse(response, shape)417 original['ResponseMetadata'] = {418 'RequestId': original.pop('RequestID')419 }420 return original421class BaseJSONParser(ResponseParser):422 def _handle_structure(self, shape, value):423 member_shapes = shape.members424 if value is None:425 # If the comes across the wire as "null" (None in python),426 # we should be returning this unchanged, instead of as an427 # empty dict.428 return None429 final_parsed = {}430 for member_name in member_shapes:431 member_shape = member_shapes[member_name]432 json_name = member_shape.serialization.get('name', member_name)433 raw_value = value.get(json_name)434 if raw_value is not None:435 final_parsed[member_name] = self._parse_shape(436 member_shapes[member_name],437 raw_value)438 return final_parsed439 def _handle_map(self, shape, value):440 parsed = {}441 key_shape = shape.key442 value_shape = shape.value443 for key, value in value.items():444 actual_key = self._parse_shape(key_shape, key)445 actual_value = self._parse_shape(value_shape, value)446 parsed[actual_key] = actual_value447 return parsed448 def _handle_blob(self, shape, value):449 return self._blob_parser(value)450 def _handle_timestamp(self, shape, value):451 return self._timestamp_parser(value)452 def _do_error_parse(self, response, shape):453 body = self._parse_body_as_json(response['body'])454 error = {"Error": {"Message": '', "Code": ''}, "ResponseMetadata": {}}455 # Error responses can have slightly different structures for json.456 # The basic structure is:457 #458 # {"__type":"ConnectClientException",459 # "message":"The error message."}460 # The error message can either come in the 'message' or 'Message' key461 # so we need to check for both.462 error['Error']['Message'] = body.get('message',463 body.get('Message', ''))464 code = body.get('__type')465 if code is not None:466 # code has a couple forms as well:467 # * "com.aws.dynamodb.vAPI#ProvisionedThroughputExceededException"468 # * "ResourceNotFoundException"469 if '#' in code:470 code = code.rsplit('#', 1)[1]471 error['Error']['Code'] = code472 self._inject_response_metadata(error, response['headers'])473 return error474 def _inject_response_metadata(self, parsed, headers):475 if 'x-amzn-requestid' in headers:476 parsed.setdefault('ResponseMetadata', {})['RequestId'] = (477 headers['x-amzn-requestid'])478 def _parse_body_as_json(self, body_contents):479 if not body_contents:480 return {}481 body = body_contents.decode(self.DEFAULT_ENCODING)482 original_parsed = json.loads(body)483 return original_parsed484class JSONParser(BaseJSONParser):485 """Response parse for the "json" protocol."""486 def _do_parse(self, response, shape):487 # The json.loads() gives us the primitive JSON types,488 # but we need to traverse the parsed JSON data to convert489 # to richer types (blobs, timestamps, etc.490 parsed = {}491 if shape is not None:492 original_parsed = self._parse_body_as_json(response['body'])493 parsed = self._parse_shape(shape, original_parsed)494 self._inject_response_metadata(parsed, response['headers'])495 return parsed496class BaseRestParser(ResponseParser):497 def _do_parse(self, response, shape):498 final_parsed = {}499 final_parsed['ResponseMetadata'] = self._populate_response_metadata(500 response)501 if shape is None:502 return final_parsed503 member_shapes = shape.members504 self._parse_non_payload_attrs(response, shape,505 member_shapes, final_parsed)506 self._parse_payload(response, shape, member_shapes, final_parsed)507 return final_parsed508 def _populate_response_metadata(self, response):509 metadata = {}510 headers = response['headers']511 if 'x-amzn-requestid' in headers:512 metadata['RequestId'] = headers['x-amzn-requestid']513 elif 'x-amz-request-id' in headers:514 metadata['RequestId'] = headers['x-amz-request-id']515 # HostId is what it's called whenver this value is returned516 # in an XML response body, so to be consistent, we'll always517 # call is HostId.518 metadata['HostId'] = headers.get('x-amz-id-2', '')519 return metadata520 def _parse_payload(self, response, shape, member_shapes, final_parsed):521 if 'payload' in shape.serialization:522 # If a payload is specified in the output shape, then only that523 # shape is used for the body payload.524 payload_member_name = shape.serialization['payload']525 body_shape = member_shapes[payload_member_name]526 if body_shape.type_name in ['string', 'blob']:527 # This is a stream528 body = response['body']529 if isinstance(body, bytes):530 body = body.decode(self.DEFAULT_ENCODING)531 final_parsed[payload_member_name] = body532 else:533 original_parsed = self._initial_body_parse(response['body'])534 final_parsed[payload_member_name] = self._parse_shape(535 body_shape, original_parsed)536 else:537 original_parsed = self._initial_body_parse(response['body'])538 body_parsed = self._parse_shape(shape, original_parsed)539 final_parsed.update(body_parsed)540 def _parse_non_payload_attrs(self, response, shape,541 member_shapes, final_parsed):542 headers = response['headers']543 for name in member_shapes:544 member_shape = member_shapes[name]545 location = member_shape.serialization.get('location')546 if location is None:547 continue548 elif location == 'statusCode':549 final_parsed[name] = self._parse_shape(550 member_shape, response['status_code'])551 elif location == 'headers':552 final_parsed[name] = self._parse_header_map(member_shape,553 headers)554 elif location == 'header':555 header_name = member_shape.serialization.get('name', name)556 if header_name in headers:557 final_parsed[name] = self._parse_shape(558 member_shape, headers[header_name])559 def _parse_header_map(self, shape, headers):560 # Note that headers are case insensitive, so we .lower()561 # all header names and header prefixes.562 parsed = {}563 prefix = shape.serialization.get('name', '').lower()564 for header_name in headers:565 if header_name.lower().startswith(prefix):566 # The key name inserted into the parsed hash567 # strips off the prefix.568 name = header_name[len(prefix):]569 parsed[name] = headers[header_name]570 return parsed571 def _initial_body_parse(self, body_contents):572 # This method should do the initial xml/json parsing of the573 # body. We we still need to walk the parsed body in order574 # to convert types, but this method will do the first round575 # of parsing.576 raise NotImplementedError("_initial_body_parse")577class RestJSONParser(BaseRestParser, BaseJSONParser):578 def _initial_body_parse(self, body_contents):579 return self._parse_body_as_json(body_contents)580 def _do_error_parse(self, response, shape):581 error = super(RestJSONParser, self)._do_error_parse(response, shape)582 self._inject_error_code(error, response)583 return error584 def _inject_error_code(self, error, response):585 # The "Code" value can come from either a response586 # header or a value in the JSON body.587 body = self._initial_body_parse(response['body'])588 if 'x-amzn-errortype' in response['headers']:589 code = response['headers']['x-amzn-errortype']590 # Could be:591 # x-amzn-errortype: ValidationException:592 code = code.split(':')[0]593 error['Error']['Code'] = code594 elif 'code' in body or 'Code' in body:595 error['Error']['Code'] = body.get(596 'code', body.get('Code', ''))597class RestXMLParser(BaseRestParser, BaseXMLResponseParser):598 def _initial_body_parse(self, xml_string):599 if not xml_string:600 return xml.etree.cElementTree.Element('')601 return self._parse_xml_string_to_dom(xml_string)602 def _do_error_parse(self, response, shape):603 # We're trying to be service agnostic here, but S3 does have a slightly604 # different response structure for its errors compared to other605 # rest-xml serivces (route53/cloudfront). We handle this by just606 # trying to parse both forms.607 # First:608 # <ErrorResponse xmlns="...">609 # <Error>610 # <Type>Sender</Type>611 # <Code>InvalidInput</Code>612 # <Message>Invalid resource type: foo</Message>613 # </Error>614 # <RequestId>request-id</RequestId>615 # </ErrorResponse>616 if response['body']:617 # If the body ends up being invalid xml, the xml parser should not618 # blow up. It should at least try to pull information about the619 # the error response from other sources like the HTTP status code.620 try:621 return self._parse_error_from_body(response)622 except ResponseParserError as e:623 LOG.debug(624 'Exception caught when parsing error response body:',625 exc_info=True)626 return self._parse_error_from_http_status(response)627 def _parse_error_from_http_status(self, response):628 return {629 'Error': {630 'Code': str(response['status_code']),631 'Message': six.moves.http_client.responses.get(632 response['status_code'], ''),633 },634 'ResponseMetadata': {635 'RequestId': response['headers'].get('x-amz-request-id', ''),636 'HostId': response['headers'].get('x-amz-id-2', ''),637 }638 }639 def _parse_error_from_body(self, response):640 xml_contents = response['body']641 root = self._parse_xml_string_to_dom(xml_contents)642 parsed = self._build_name_to_xml_node(root)643 self._replace_nodes(parsed)644 if root.tag == 'Error':645 # This is an S3 error response. First we'll populate the646 # response metadata.647 metadata = self._populate_response_metadata(response)648 # The RequestId and the HostId are already in the649 # ResponseMetadata, but are also duplicated in the XML650 # body. We don't need these values in both places,651 # we'll just remove them from the parsed XML body.652 parsed.pop('RequestId', '')653 parsed.pop('HostId', '')654 return {'Error': parsed, 'ResponseMetadata': metadata}655 elif 'RequestId' in parsed:656 # Other rest-xml serivces:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful