How to use _get_list_key_prefix method in localstack

Best Python code snippet using localstack_python

parser.py

Source:parser.py Github

copy

Full Screen

...431 }432 ::433 """434 # The keys might be prefixed (f.e. for flattened lists)435 key_prefix = self._get_list_key_prefix(shape)436 # We collect the list value as well as the integer indicating the list position so we can437 # later sort the list by the position, in case they attribute values are unordered438 result: List[Tuple[int, Any]] = []439 i = 0440 while True:441 i += 1442 key_name = f"{key_prefix}{i}"443 value = self._process_member(request, key_name, shape.member, node)444 if value is None:445 break446 result.append((i, value))447 return [r[1] for r in sorted(result)] if len(result) > 0 else None448 @staticmethod449 def _get_first(node: Any) -> Any:450 if isinstance(node, (list, tuple)):451 return node[0]452 return node453 @staticmethod454 def _filter_node(name: str, node: dict) -> dict:455 """Filters the node dict for entries where the key starts with the given name."""456 filtered = {k[len(name) + 1 :]: v for k, v in node.items() if k.startswith(name)}457 return filtered if len(filtered) > 0 else None458 def _get_serialized_name(self, shape: Shape, default_name: str) -> str:459 """460 Returns the serialized name for the shape if it exists.461 Otherwise it will return the given default_name.462 """463 return shape.serialization.get("name", default_name)464 def _get_list_key_prefix(self, shape: ListShape):465 key_prefix = ""466 # Non-flattened lists have an additional hierarchy level:467 # https://awslabs.github.io/smithy/1.0/spec/core/xml-traits.html#xmlflattened-trait468 # The hierarchy level's name is the serialization name of its member or (by default) "member".469 if not shape.serialization.get("flattened"):470 key_prefix += f"{self._get_serialized_name(shape.member, 'member')}."471 return key_prefix472class BaseRestRequestParser(RequestParser):473 """474 The ``BaseRestRequestParser`` is the base class for all "resty" AWS service protocols.475 The operation which should be invoked is determined based on the HTTP method and the path suffix.476 The body encoding is done in the respective subclasses.477 """478 def __init__(self, service: ServiceModel) -> None:479 super().__init__(service)480 self._operation_router = RestServiceOperationRouter(service)481 def _get_normalized_request_uri_length(self, operation_model: OperationModel) -> int:482 """483 Fings the length of the normalized request URI for the given operation model.484 See #_get_normalized_request_uri for a description of the normalization.485 """486 return len(self._get_normalized_request_uri(operation_model))487 def _get_normalized_request_uri(self, operation_model: OperationModel) -> str:488 """489 Fings the normalized request URI for the given operation model.490 A normalized request URI has a static, common replacement for path parameter placeholders, starting with a491 space character (which is the lowest non-control character in ASCII and is not expected to be present in a492 service specification's request URI pattern).493 This allows the resulting normalized request URIs to be sorted.494 :param operation_model: to get the normalized request URI for.495 This function expects that the given operation model has HTTP metadata!496 :return: normalized request URI for the given operation model497 """498 request_uri: str = operation_model.http.get("requestUri")499 # Make sure that all path parameter placeholders have the same name and length500 return re.sub(r"{(.*?)}", " param", request_uri)501 @_handle_exceptions502 def parse(self, request: HttpRequest) -> Tuple[OperationModel, Any]:503 try:504 operation, uri_params = self._operation_router.match(request)505 except NotFound as e:506 raise OperationNotFoundParserError(507 f"Unable to find operation for request to service "508 f"{self.service.service_name}: {request.method} {request.path}"509 ) from e510 shape: StructureShape = operation.input_shape511 final_parsed = {}512 if shape is not None:513 self._parse_payload(request, shape, shape.members, uri_params, final_parsed)514 return operation, final_parsed515 def _parse_payload(516 self,517 request: HttpRequest,518 shape: Shape,519 member_shapes: Dict[str, Shape],520 uri_params: Mapping[str, Any],521 final_parsed: dict,522 ) -> None:523 """Parses all attributes which are located in the payload / body of the incoming request."""524 payload_parsed = {}525 non_payload_parsed = {}526 if "payload" in shape.serialization:527 # If a payload is specified in the output shape, then only that shape is used for the body payload.528 payload_member_name = shape.serialization["payload"]529 body_shape = member_shapes[payload_member_name]530 if body_shape.serialization.get("eventstream"):531 body = self._create_event_stream(request, body_shape)532 payload_parsed[payload_member_name] = body533 elif body_shape.type_name == "string":534 # Only set the value if it's not empty (the request's data is an empty binary by default)535 if request.data:536 body = request.data537 if isinstance(body, bytes):538 body = body.decode(self.DEFAULT_ENCODING)539 payload_parsed[payload_member_name] = body540 elif body_shape.type_name == "blob":541 # Only set the value if it's not empty (the request's data is an empty binary by default)542 if request.data:543 payload_parsed[payload_member_name] = request.data544 else:545 original_parsed = self._initial_body_parse(request)546 payload_parsed[payload_member_name] = self._parse_shape(547 request, body_shape, original_parsed, uri_params548 )549 else:550 # The payload covers the whole body. We only parse the body if it hasn't been handled by the payload logic.551 non_payload_parsed = self._initial_body_parse(request)552 # even if the payload has been parsed, the rest of the shape needs to be processed as well553 # (for members which are located outside of the body, like uri or header)554 non_payload_parsed = self._parse_shape(request, shape, non_payload_parsed, uri_params)555 # update the final result with the parsed body and the parsed payload (where the payload has precedence)556 final_parsed.update(non_payload_parsed)557 final_parsed.update(payload_parsed)558 def _initial_body_parse(self, request: HttpRequest) -> Any:559 """560 This method executes the initial parsing of the body (XML, JSON, or CBOR).561 The parsed body will afterwards still be walked through and the nodes will be converted to the appropriate562 types, but this method does the first round of parsing.563 :param request: of which the body should be parsed564 :return: depending on the actual implementation565 """566 raise NotImplementedError("_initial_body_parse")567 def _create_event_stream(self, request: HttpRequest, shape: Shape) -> Any:568 # TODO handle event streams569 raise NotImplementedError("_create_event_stream")570class RestXMLRequestParser(BaseRestRequestParser):571 """572 The ``RestXMLRequestParser`` is responsible for parsing incoming requests for services which use the ``rest-xml``573 protocol. The requests for these services encode the majority of their parameters as XML in the request body.574 **Experimental:** This parser is still experimental.575 When implementing services with this parser, some edge cases might not work out-of-the-box.576 """577 def __init__(self, service_model: ServiceModel):578 super(RestXMLRequestParser, self).__init__(service_model)579 self._namespace_re = re.compile("{.*}")580 def _initial_body_parse(self, request: HttpRequest) -> ETree.Element:581 body = request.data582 if not body:583 return ETree.Element("")584 return self._parse_xml_string_to_dom(body)585 def _parse_structure(586 self,587 request: HttpRequest,588 shape: StructureShape,589 node: ETree.Element,590 uri_params: Mapping[str, Any] = None,591 ) -> dict:592 parsed = {}593 xml_dict = self._build_name_to_xml_node(node)594 for member_name, member_shape in shape.members.items():595 xml_name = self._member_key_name(member_shape, member_name)596 member_node = xml_dict.get(xml_name)597 # If a shape defines a location trait, the node might be None (since these are extracted from the request's598 # metadata like headers or the URI)599 if (600 member_node is not None601 or "location" in member_shape.serialization602 or member_shape.serialization.get("eventheader")603 ):604 parsed[member_name] = self._parse_shape(605 request, member_shape, member_node, uri_params606 )607 elif member_shape.serialization.get("xmlAttribute"):608 attributes = {}609 location_name = member_shape.serialization["name"]610 for key, value in node.attrib.items():611 new_key = self._namespace_re.sub(location_name.split(":")[0] + ":", key)612 attributes[new_key] = value613 if location_name in attributes:614 parsed[member_name] = attributes[location_name]615 elif member_name in shape.required_members:616 # If the member is required, but not existing, we explicitly set None617 parsed[member_name] = None618 return parsed619 def _parse_map(620 self,621 request: HttpRequest,622 shape: MapShape,623 node: dict,624 uri_params: Mapping[str, Any] = None,625 ) -> dict:626 parsed = {}627 key_shape = shape.key628 value_shape = shape.value629 key_location_name = key_shape.serialization.get("name", "key")630 value_location_name = value_shape.serialization.get("name", "value")631 if shape.serialization.get("flattened") and not isinstance(node, list):632 node = [node]633 for keyval_node in node:634 key_name = val_name = None635 for single_pair in keyval_node:636 # Within each <entry> there's a <key> and a <value>637 tag_name = self._node_tag(single_pair)638 if tag_name == key_location_name:639 key_name = self._parse_shape(request, key_shape, single_pair, uri_params)640 elif tag_name == value_location_name:641 val_name = self._parse_shape(request, value_shape, single_pair, uri_params)642 else:643 raise ProtocolParserError("Unknown tag: %s" % tag_name)644 parsed[key_name] = val_name645 return parsed646 def _parse_list(647 self,648 request: HttpRequest,649 shape: ListShape,650 node: dict,651 uri_params: Mapping[str, Any] = None,652 ) -> list:653 # When we use _build_name_to_xml_node, repeated elements are aggregated654 # into a list. However, we can't tell the difference between a scalar655 # value and a single element flattened list. So before calling the656 # real _handle_list, we know that "node" should actually be a list if657 # it's flattened, and if it's not, then we make it a one element list.658 if shape.serialization.get("flattened") and not isinstance(node, list):659 node = [node]660 return super(RestXMLRequestParser, self)._parse_list(request, shape, node, uri_params)661 def _node_tag(self, node: ETree.Element) -> str:662 return self._namespace_re.sub("", node.tag)663 @staticmethod664 def _member_key_name(shape: Shape, member_name: str) -> str:665 # This method is needed because we have to special case flattened list666 # with a serialization name. If this is the case we use the667 # locationName from the list's member shape as the key name for the668 # surrounding structure.669 if isinstance(shape, ListShape) and shape.serialization.get("flattened"):670 list_member_serialized_name = shape.member.serialization.get("name")671 if list_member_serialized_name is not None:672 return list_member_serialized_name673 serialized_name = shape.serialization.get("name")674 if serialized_name is not None:675 return serialized_name676 return member_name677 def _parse_xml_string_to_dom(self, xml_string: bytes) -> ETree.Element:678 try:679 parser = ETree.XMLParser(target=ETree.TreeBuilder(), encoding=self.DEFAULT_ENCODING)680 parser.feed(xml_string)681 root = parser.close()682 except ETree.ParseError as e:683 raise ProtocolParserError(684 "Unable to parse request (%s), invalid XML received:\n%s" % (e, xml_string)685 ) from e686 return root687 def _build_name_to_xml_node(self, parent_node: Union[list, ETree.Element]) -> dict:688 # If the parent node is actually a list. We should not be trying689 # to serialize it to a dictionary. Instead, return the first element690 # in the list.691 if isinstance(parent_node, list):692 return self._build_name_to_xml_node(parent_node[0])693 xml_dict = {}694 for item in parent_node:695 key = self._node_tag(item)696 if key in xml_dict:697 # If the key already exists, the most natural698 # way to handle this is to aggregate repeated699 # keys into a single list.700 # <foo>1</foo><foo>2</foo> -> {'foo': [Node(1), Node(2)]}701 if isinstance(xml_dict[key], list):702 xml_dict[key].append(item)703 else:704 # Convert from a scalar to a list.705 xml_dict[key] = [xml_dict[key], item]706 else:707 xml_dict[key] = item708 return xml_dict709 def _create_event_stream(self, request: HttpRequest, shape: Shape) -> Any:710 # TODO handle event streams711 raise NotImplementedError("_create_event_stream")712class BaseJSONRequestParser(RequestParser, ABC):713 """714 The ``BaseJSONRequestParser`` is the base class for all JSON-based AWS service protocols.715 This base-class handles parsing the payload / body as JSON.716 """717 TIMESTAMP_FORMAT = "unixtimestamp"718 def _parse_structure(719 self,720 request: HttpRequest,721 shape: StructureShape,722 value: Optional[dict],723 uri_params: Mapping[str, Any] = None,724 ) -> Optional[dict]:725 if shape.is_document_type:726 final_parsed = value727 else:728 if value is None:729 # If the comes across the wire as "null" (None in python),730 # we should be returning this unchanged, instead of as an731 # empty dict.732 return None733 final_parsed = {}734 for member_name, member_shape in shape.members.items():735 json_name = member_shape.serialization.get("name", member_name)736 raw_value = value.get(json_name)737 parsed = self._parse_shape(request, member_shape, raw_value, uri_params)738 if parsed is not None or member_name in shape.required_members:739 # If the member is required, but not existing, we set it to None anyways740 final_parsed[member_name] = parsed741 return final_parsed742 def _parse_map(743 self,744 request: HttpRequest,745 shape: MapShape,746 value: Optional[dict],747 uri_params: Mapping[str, Any] = None,748 ) -> Optional[dict]:749 if value is None:750 return None751 parsed = {}752 key_shape = shape.key753 value_shape = shape.value754 for key, value in value.items():755 actual_key = self._parse_shape(request, key_shape, key, uri_params)756 actual_value = self._parse_shape(request, value_shape, value, uri_params)757 parsed[actual_key] = actual_value758 return parsed759 def _parse_body_as_json(self, request: HttpRequest) -> dict:760 body_contents = request.data761 if not body_contents:762 return {}763 if request.mimetype.startswith("application/x-amz-cbor"):764 try:765 return cbor2.loads(body_contents)766 except ValueError as e:767 raise ProtocolParserError("HTTP body could not be parsed as CBOR.") from e768 else:769 try:770 return request.get_json(force=True)771 except ValueError as e:772 raise ProtocolParserError("HTTP body could not be parsed as JSON.") from e773 def _parse_boolean(774 self, request: HttpRequest, shape: Shape, node: bool, uri_params: Mapping[str, Any] = None775 ) -> bool:776 return super()._noop_parser(request, shape, node, uri_params)777class JSONRequestParser(BaseJSONRequestParser):778 """779 The ``JSONRequestParser`` is responsible for parsing incoming requests for services which use the ``json``780 protocol.781 The requests for these services encode the majority of their parameters as JSON in the request body.782 The operation is defined in an HTTP header field.783 **Experimental:** This parser is still experimental.784 When implementing services with this parser, some edge cases might not work out-of-the-box.785 """786 @_handle_exceptions787 def parse(self, request: HttpRequest) -> Tuple[OperationModel, Any]:788 target = request.headers["X-Amz-Target"]789 # assuming that the last part of the target string (e.g., "x.y.z.MyAction") contains the operation name790 operation_name = target.rpartition(".")[2]791 operation = self.service.operation_model(operation_name)792 shape = operation.input_shape793 # There are no uri params in the query protocol794 uri_params = {}795 final_parsed = self._do_parse(request, shape, uri_params)796 return operation, final_parsed797 def _do_parse(798 self, request: HttpRequest, shape: Shape, uri_params: Mapping[str, Any] = None799 ) -> dict:800 parsed = {}801 if shape is not None:802 event_name = shape.event_stream_name803 if event_name:804 parsed = self._handle_event_stream(request, shape, event_name)805 else:806 parsed = self._handle_json_body(request, shape, uri_params)807 return parsed808 def _handle_event_stream(self, request: HttpRequest, shape: Shape, event_name: str):809 # TODO handle event streams810 raise NotImplementedError811 def _handle_json_body(812 self, request: HttpRequest, shape: Shape, uri_params: Mapping[str, Any] = None813 ) -> Any:814 # The json.loads() gives us the primitive JSON types, but we need to traverse the parsed JSON data to convert815 # to richer types (blobs, timestamps, etc.)816 parsed_json = self._parse_body_as_json(request)817 return self._parse_shape(request, shape, parsed_json, uri_params)818class RestJSONRequestParser(BaseRestRequestParser, BaseJSONRequestParser):819 """820 The ``RestJSONRequestParser`` is responsible for parsing incoming requests for services which use the ``rest-json``821 protocol.822 The requests for these services encode the majority of their parameters as JSON in the request body.823 The operation is defined by the HTTP method and the path suffix.824 **Experimental:** This parser is still experimental.825 When implementing services with this parser, some edge cases might not work out-of-the-box.826 """827 def _initial_body_parse(self, request: HttpRequest) -> dict:828 return self._parse_body_as_json(request)829 def _create_event_stream(self, request: HttpRequest, shape: Shape) -> Any:830 raise NotImplementedError831class EC2RequestParser(QueryRequestParser):832 """833 The ``EC2RequestParser`` is responsible for parsing incoming requests for services which use the ``ec2``834 protocol (which only is EC2). Protocol is quite similar to the ``query`` protocol with some small differences.835 **Experimental:** This parser is still experimental.836 When implementing services with this parser, some edge cases might not work out-of-the-box.837 """838 def _get_serialized_name(self, shape: Shape, default_name: str) -> str:839 # Returns the serialized name for the shape if it exists.840 # Otherwise it will return the passed in default_name.841 if "queryName" in shape.serialization:842 return shape.serialization["queryName"]843 elif "name" in shape.serialization:844 # A locationName is always capitalized on input for the ec2 protocol.845 name = shape.serialization["name"]846 return name[0].upper() + name[1:]847 else:848 return default_name849 def _get_list_key_prefix(self, shape: ListShape):850 # The EC2 protocol does not use a prefix notation for flattened lists851 return ""852class S3RequestParser(RestXMLRequestParser):853 @_handle_exceptions854 def parse(self, request: HttpRequest) -> Tuple[OperationModel, Any]:855 """Handle virtual-host-addressing for S3."""856 if (857 # TODO implement a more sophisticated determination if the host contains S3 virtual host addressing858 not request.host.startswith("s3.")859 and not request.host.startswith("localhost.")860 and not request.host.startswith("127.0.0.1")861 ):862 self._revert_virtual_host_style(request)863 return super().parse(request)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful