How to use _noop_parser method in localstack

Best Python code snippet using localstack_python

parser.py

Source:parser.py Github

copy

Full Screen

...197 if value == "false":198 return False199 raise ValueError("cannot parse boolean value %s" % node)200 @_text_content201 def _noop_parser(self, _, __, node: any):202 return node203 _parse_character = _parse_string = _noop_parser204 _parse_double = _parse_float205 _parse_long = _parse_integer206 def _convert_str_to_timestamp(self, value: str, timestamp_format=None):207 if timestamp_format is None:208 timestamp_format = self.TIMESTAMP_FORMAT209 timestamp_format = timestamp_format.lower()210 converter = getattr(self, "_timestamp_%s" % timestamp_format)211 final_value = converter(value)212 return final_value213 @staticmethod214 def _timestamp_iso8601(date_string: str) -> datetime.datetime:215 return dateutil.parser.isoparse(date_string)216 @staticmethod217 def _timestamp_unixtimestamp(timestamp_string: str) -> datetime.datetime:218 return datetime.datetime.utcfromtimestamp(int(timestamp_string))219 @staticmethod220 def _timestamp_rfc822(datetime_string: str) -> datetime.datetime:221 return parsedate_to_datetime(datetime_string)222class QueryRequestParser(RequestParser):223 """224 The ``QueryRequestParser`` is responsible for parsing incoming requests for services which use the ``query``225 protocol. The requests for these services encode the majority of their parameters in the URL query string.226 **Experimental:** This parser is still experimental.227 When implementing services with this parser, some edge cases might not work out-of-the-box.228 """229 # Prefix for non-flattened lists230 NON_FLATTENED_LIST_PREFIX = "member."231 def parse(self, request: HttpRequest) -> Tuple[OperationModel, Any]:232 body = to_str(request["body"])233 instance = parse_qs(body, keep_blank_values=True)234 # The query parsing returns a list for each entry in the dict (this is how HTTP handles lists in query params).235 # However, the AWS Query format does not have any duplicates.236 # Therefore we take the first element of each entry in the dict.237 instance = {k: self._get_first(v) for k, v in instance.items()}238 operation: OperationModel = self.service.operation_model(instance["Action"])239 input_shape: StructureShape = operation.input_shape240 return operation, self._parse_shape(request, input_shape, instance)241 def _process_member(242 self, request: HttpRequest, member_name: str, member_shape: Shape, node: dict243 ):244 if isinstance(member_shape, (MapShape, ListShape, StructureShape)):245 # If we have a complex type, we filter the node and change it's keys to craft a new "context" for the246 # new hierarchy level247 sub_node = self._filter_node(member_name, node)248 else:249 # If it is a primitive type we just get the value from the dict250 sub_node = node.get(member_name)251 # The filtered node is processed and returned (or None if the sub_node is None)252 return self._parse_shape(request, member_shape, sub_node) if sub_node is not None else None253 def _parse_structure(self, request: HttpRequest, shape: StructureShape, node: dict) -> dict:254 result = dict()255 for member, member_shape in shape.members.items():256 # The key in the node is either the serialization config "name" of the shape, or the name of the member257 member_name = self._get_serialized_name(member_shape, member)258 # BUT, if it's flattened and a list, the name is defined by the list's member's name259 if member_shape.serialization.get("flattened"):260 if isinstance(member_shape, ListShape):261 member_name = self._get_serialized_name(member_shape.member, member)262 value = self._process_member(request, member_name, member_shape, node)263 if value is not None or member in shape.required_members:264 # If the member is required, but not existing, we explicitly set None265 result[member] = value266 return result if len(result) > 0 else None267 def _parse_map(self, request: HttpRequest, shape: MapShape, node: dict) -> dict:268 """269 This is what the node looks like for a flattened map::270 ::271 {272 "Attribute.1.Name": "MyKey",273 "Attribute.1.Value": "MyValue",274 "Attribute.2.Name": ...,275 ...276 }277 ::278 This function expects an already filtered / pre-processed node. The node dict would therefore look like:279 ::280 {281 "1.Name": "MyKey",282 "1.Value": "MyValue",283 "2.Name": ...284 }285 ::286 """287 key_prefix = ""288 # Non-flattened maps have an additional hierarchy level named "entry"289 # https://awslabs.github.io/smithy/1.0/spec/core/xml-traits.html#xmlflattened-trait290 if not shape.serialization.get("flattened"):291 key_prefix += "entry."292 result = dict()293 i = 0294 while True:295 i += 1296 # The key and value can be renamed (with their serialization config's "name").297 # By default they are called "key" and "value".298 key_name = f"{key_prefix}{i}.{self._get_serialized_name(shape.key, 'key')}"299 value_name = f"{key_prefix}{i}.{self._get_serialized_name(shape.value, 'value')}"300 # We process the key and value individually301 k = self._process_member(request, key_name, shape.key, node)302 v = self._process_member(request, value_name, shape.value, node)303 if k is None or v is None:304 # technically, if one exists but not the other, then that would be an invalid request305 break306 result[k] = v307 return result if len(result) > 0 else None308 def _parse_list(self, request: HttpRequest, shape: ListShape, node: dict) -> list:309 """310 Some actions take lists of parameters. These lists are specified using the param.[member.]n notation.311 The "member" is used if the list is not flattened.312 Values of n are integers starting from 1.313 For example, a list with two elements looks like this:314 - Flattened: &AttributeName.1=first&AttributeName.2=second315 - Non-flattened: &AttributeName.member.1=first&AttributeName.member.2=second316 This function expects an already filtered / processed node. The node dict would therefore look like:317 ::318 {319 "1": "first",320 "2": "second",321 "3": ...322 }323 ::324 """325 key_prefix = ""326 # Non-flattened lists have an additional hierarchy level named "member"327 # https://awslabs.github.io/smithy/1.0/spec/core/xml-traits.html#xmlflattened-trait328 if not shape.serialization.get("flattened"):329 key_prefix += self.NON_FLATTENED_LIST_PREFIX330 # We collect the list value as well as the integer indicating the list position so we can331 # later sort the list by the position, in case they attribute values are unordered332 result: List[Tuple[int, Any]] = list()333 i = 0334 while True:335 i += 1336 key_name = f"{key_prefix}{i}"337 value = self._process_member(request, key_name, shape.member, node)338 if value is None:339 break340 result.append((i, value))341 return [r[1] for r in sorted(result)] if len(result) > 0 else None342 @staticmethod343 def _get_first(node: any) -> any:344 if isinstance(node, (list, tuple)):345 return node[0]346 return node347 @staticmethod348 def _filter_node(name: str, node: dict) -> dict:349 """Filters the node dict for entries where the key starts with the given name."""350 filtered = {k[len(name) + 1 :]: v for k, v in node.items() if k.startswith(name)}351 return filtered if len(filtered) > 0 else None352 def _get_serialized_name(self, shape: Shape, default_name: str) -> str:353 """354 Returns the serialized name for the shape if it exists.355 Otherwise it will return the given default_name.356 """357 return shape.serialization.get("name", default_name)358class BaseRestRequestParser(RequestParser):359 """360 The ``BaseRestRequestParser`` is the base class for all "resty" AWS service protocols.361 The operation which should be invoked is determined based on the HTTP method and the path suffix.362 The body encoding is done in the respective subclasses.363 """364 def __init__(self, service: ServiceModel) -> None:365 super().__init__(service)366 # When parsing a request, we need to lookup the operation based on the HTTP method and URI.367 # Therefore we create a mapping when the parser is initialized.368 self.operation_lookup = defaultdict(lambda: defaultdict(OperationModel))369 for operation in service.operation_names:370 operation_model = service.operation_model(operation)371 http = operation_model.http372 if len(http) > 0:373 method = http.get("method")374 request_uri = http.get("requestUri")375 self.operation_lookup[method][request_uri] = operation_model376 def parse(self, request: HttpRequest) -> Tuple[OperationModel, Any]:377 operation = self.operation_lookup[request["method"]][request["path"]]378 shape: StructureShape = operation.input_shape379 final_parsed = {}380 if shape is not None:381 member_shapes = shape.members382 self._parse_non_payload_attrs(request, member_shapes, final_parsed)383 self._parse_payload(request, shape, member_shapes, final_parsed)384 return operation, final_parsed385 def _parse_payload(386 self,387 request: HttpRequest,388 shape: Shape,389 member_shapes: Dict[str, Shape],390 final_parsed: dict,391 ) -> None:392 """Parses all attributes which are located in the payload / body of the incoming request."""393 if "payload" in shape.serialization:394 # If a payload is specified in the output shape, then only that shape is used for the body payload.395 payload_member_name = shape.serialization["payload"]396 body_shape = member_shapes[payload_member_name]397 if body_shape.serialization.get("eventstream"):398 body = self._create_event_stream(request, body_shape)399 final_parsed[payload_member_name] = body400 elif body_shape.type_name in ["string", "blob"]:401 # This is a stream402 body = request["body"]403 if isinstance(body, bytes):404 body = body.decode(self.DEFAULT_ENCODING)405 final_parsed[payload_member_name] = body406 else:407 original_parsed = self._initial_body_parse(request["body"])408 final_parsed[payload_member_name] = self._parse_shape(409 request, body_shape, original_parsed410 )411 else:412 original_parsed = self._initial_body_parse(request["body"])413 body_parsed = self._parse_shape(request, shape, original_parsed)414 final_parsed.update(body_parsed)415 def _parse_non_payload_attrs(416 self, request: HttpRequest, member_shapes: dict, final_parsed: dict417 ) -> None:418 """Parses all attributes which are not located in the payload."""419 headers = request["headers"]420 for name in member_shapes:421 member_shape = member_shapes[name]422 location = member_shape.serialization.get("location")423 if location is None:424 continue425 elif location == "headers":426 final_parsed[name] = self._parse_header_map(member_shape, headers)427 elif location == "header":428 header_name = member_shape.serialization.get("name", name)429 if header_name in headers:430 final_parsed[name] = self._parse_shape(431 request, member_shape, headers[header_name]432 )433 else:434 raise RequestParserError("Unknown shape location '%s'." % location)435 @staticmethod436 def _parse_header_map(shape: Shape, headers: dict) -> dict:437 # Note that headers are case insensitive, so we .lower() all header names and header prefixes.438 parsed = {}439 prefix = shape.serialization.get("name", "").lower()440 for header_name in headers:441 if header_name.lower().startswith(prefix):442 # The key name inserted into the parsed hash strips off the prefix.443 name = header_name[len(prefix) :]444 parsed[name] = headers[header_name]445 return parsed446 def _initial_body_parse(self, body_contents: bytes) -> any:447 """448 This method executes the initial XML or JSON parsing of the body.449 The parsed body will afterwards still be walked through and the nodes will be converted to the appropriate450 types, but this method does the first round of parsing.451 :param body_contents: which should be parsed452 :return: depending on the actual implementation453 """454 raise NotImplementedError("_initial_body_parse")455 def _create_event_stream(self, request: dict, shape: Shape) -> any:456 # TODO handle event streams457 raise NotImplementedError("_create_event_stream")458class RestXMLRequestParser(BaseRestRequestParser):459 """460 The ``RestXMLRequestParser`` is responsible for parsing incoming requests for services which use the ``rest-xml``461 protocol. The requests for these services encode the majority of their parameters as XML in the request body.462 **Experimental:** This parser is still experimental.463 When implementing services with this parser, some edge cases might not work out-of-the-box.464 """465 def __init__(self, service_model: ServiceModel):466 super(RestXMLRequestParser, self).__init__(service_model)467 self._namespace_re = re.compile("{.*}")468 def _initial_body_parse(self, xml_string: str) -> ETree.Element:469 if not xml_string:470 return ETree.Element("")471 return self._parse_xml_string_to_dom(xml_string)472 def _parse_structure(473 self, request: HttpRequest, shape: StructureShape, node: ETree.Element474 ) -> dict:475 parsed = {}476 xml_dict = self._build_name_to_xml_node(node)477 for member_name, member_shape in shape.members.items():478 if "location" in member_shape.serialization or member_shape.serialization.get(479 "eventheader"480 ):481 # All members with locations have already been handled in ``_parse_non_payload_attrs``,482 # so we don't need to parse these members.483 continue484 xml_name = self._member_key_name(member_shape, member_name)485 member_node = xml_dict.get(xml_name)486 if member_node is not None:487 parsed[member_name] = self._parse_shape(request, member_shape, member_node)488 elif member_shape.serialization.get("xmlAttribute"):489 attributes = {}490 location_name = member_shape.serialization["name"]491 for key, value in node.attrib.items():492 new_key = self._namespace_re.sub(location_name.split(":")[0] + ":", key)493 attributes[new_key] = value494 if location_name in attributes:495 parsed[member_name] = attributes[location_name]496 elif member_name in shape.required_members:497 # If the member is required, but not existing, we explicitly set None498 parsed[member_name] = None499 return parsed500 def _parse_map(self, request: HttpRequest, shape: MapShape, node: dict) -> dict:501 parsed = {}502 key_shape = shape.key503 value_shape = shape.value504 key_location_name = key_shape.serialization.get("name", "key")505 value_location_name = value_shape.serialization.get("name", "value")506 if shape.serialization.get("flattened") and not isinstance(node, list):507 node = [node]508 for keyval_node in node:509 key_name = val_name = None510 for single_pair in keyval_node:511 # Within each <entry> there's a <key> and a <value>512 tag_name = self._node_tag(single_pair)513 if tag_name == key_location_name:514 key_name = self._parse_shape(request, key_shape, single_pair)515 elif tag_name == value_location_name:516 val_name = self._parse_shape(request, value_shape, single_pair)517 else:518 raise RequestParserError("Unknown tag: %s" % tag_name)519 parsed[key_name] = val_name520 return parsed521 def _parse_list(self, request: HttpRequest, shape: ListShape, node: dict) -> list:522 # When we use _build_name_to_xml_node, repeated elements are aggregated523 # into a list. However, we can't tell the difference between a scalar524 # value and a single element flattened list. So before calling the525 # real _handle_list, we know that "node" should actually be a list if526 # it's flattened, and if it's not, then we make it a one element list.527 if shape.serialization.get("flattened") and not isinstance(node, list):528 node = [node]529 return super(RestXMLRequestParser, self)._parse_list(request, shape, node)530 def _node_tag(self, node: ETree.Element) -> str:531 return self._namespace_re.sub("", node.tag)532 @staticmethod533 def _member_key_name(shape: Shape, member_name: str) -> str:534 # This method is needed because we have to special case flattened list535 # with a serialization name. If this is the case we use the536 # locationName from the list's member shape as the key name for the537 # surrounding structure.538 if isinstance(shape, ListShape) and shape.serialization.get("flattened"):539 list_member_serialized_name = shape.member.serialization.get("name")540 if list_member_serialized_name is not None:541 return list_member_serialized_name542 serialized_name = shape.serialization.get("name")543 if serialized_name is not None:544 return serialized_name545 return member_name546 def _parse_xml_string_to_dom(self, xml_string: str) -> ETree.Element:547 try:548 parser = ETree.XMLParser(target=ETree.TreeBuilder(), encoding=self.DEFAULT_ENCODING)549 parser.feed(xml_string)550 root = parser.close()551 except ETree.ParseError as e:552 raise RequestParserError(553 "Unable to parse request (%s), invalid XML received:\n%s" % (e, xml_string)554 )555 return root556 def _build_name_to_xml_node(self, parent_node: Union[list, ETree.Element]) -> dict:557 # If the parent node is actually a list. We should not be trying558 # to serialize it to a dictionary. Instead, return the first element559 # in the list.560 if isinstance(parent_node, list):561 return self._build_name_to_xml_node(parent_node[0])562 xml_dict = {}563 for item in parent_node:564 key = self._node_tag(item)565 if key in xml_dict:566 # If the key already exists, the most natural567 # way to handle this is to aggregate repeated568 # keys into a single list.569 # <foo>1</foo><foo>2</foo> -> {'foo': [Node(1), Node(2)]}570 if isinstance(xml_dict[key], list):571 xml_dict[key].append(item)572 else:573 # Convert from a scalar to a list.574 xml_dict[key] = [xml_dict[key], item]575 else:576 xml_dict[key] = item577 return xml_dict578 def _create_event_stream(self, request: dict, shape: Shape) -> any:579 # TODO handle event streams580 raise NotImplementedError("_create_event_stream")581class BaseJSONRequestParser(RequestParser, ABC):582 """583 The ``BaseJSONRequestParser`` is the base class for all JSON-based AWS service protocols.584 This base-class handles parsing the payload / body as JSON.585 """586 TIMESTAMP_FORMAT = "unixtimestamp"587 def _parse_structure(588 self, request: HttpRequest, shape: StructureShape, value: Optional[dict]589 ) -> Optional[dict]:590 if shape.is_document_type:591 final_parsed = value592 else:593 if value is None:594 # If the comes across the wire as "null" (None in python),595 # we should be returning this unchanged, instead of as an596 # empty dict.597 return None598 final_parsed = {}599 for member_name, member_shape in shape.members.items():600 json_name = member_shape.serialization.get("name", member_name)601 raw_value = value.get(json_name)602 if raw_value is not None:603 final_parsed[member_name] = self._parse_shape(request, member_shape, raw_value)604 elif member_name in shape.required_members:605 # If the member is required, but not existing, we explicitly set None606 final_parsed[member_name] = None607 return final_parsed608 def _parse_map(609 self, request: HttpRequest, shape: MapShape, value: Optional[dict]610 ) -> Optional[dict]:611 if value is None:612 return None613 parsed = {}614 key_shape = shape.key615 value_shape = shape.value616 for key, value in value.items():617 actual_key = self._parse_shape(request, key_shape, key)618 actual_value = self._parse_shape(request, value_shape, value)619 parsed[actual_key] = actual_value620 return parsed621 def _parse_body_as_json(self, body_contents: bytes) -> dict:622 if not body_contents:623 return {}624 body = body_contents.decode(self.DEFAULT_ENCODING)625 try:626 original_parsed = json.loads(body)627 return original_parsed628 except ValueError:629 raise RequestParserError("HTTP body could not be parsed as JSON.")630 def _parse_boolean(self, request: HttpRequest, shape: Shape, node: bool) -> bool:631 return super()._noop_parser(request, shape, node)632class JSONRequestParser(BaseJSONRequestParser):633 """634 The ``JSONRequestParser`` is responsible for parsing incoming requests for services which use the ``json``635 protocol.636 The requests for these services encode the majority of their parameters as JSON in the request body.637 The operation is defined in an HTTP header field.638 **Experimental:** This parser is still experimental.639 When implementing services with this parser, some edge cases might not work out-of-the-box.640 """641 def parse(self, request: HttpRequest) -> Tuple[OperationModel, Any]:642 target = request["headers"]["X-Amz-Target"]643 _, operation_name = target.split(".")644 operation = self.service.operation_model(operation_name)645 shape = operation.input_shape...

Full Screen

Full Screen

pcs.py

Source:pcs.py Github

copy

Full Screen

1import collections2import json3from eventool import ssh_cmds4from eventool import logger5import xmltodict6# from lxml import etree as xml_parser7from xml.dom import minidom as xml_parser8from eventool import parsers9LOG = logger.getLogger(__name__)10@parsers.cli_command("pcs", subparser="action")11class PCSMgmt(ssh_cmds.tmp_cmd):12 def __init__(self, ssh):13 super(PCSMgmt, self).__init__(ssh)14 self._dict_xml = None15 self._cluster = None16 # self._haproxy_conf = None17 @property18 def cluster(self):19 self._cluster = self._cluster or self.status_xml()20 return self._cluster21 @parsers.cli_choice(parser="pcs", subparser="action")22 @ssh_cmds.command_decorator23 def status(self):24 """Returns full PCS status. """25 cmd = "pcs status"26 return cmd, self._noop_parser27 @ssh_cmds.command_decorator28 def status_xml(self):29 LOG.debug("Get PCS data in XML format from HA node")30 cmd = "crm_mon -X"31 return cmd, self._parse_xml32 def _parse_xml(self, raw_xml):33 self._dict_xml = xmltodict.parse(raw_xml)34 # return raw_xml35 return xml_parser.parseString(raw_xml)36 def get_resource_node(self, resource):37 [node] = resource.getElementsByTagName("node")38 return node.getAttribute("name")39 def find_service_node(self, service):40 resources = self.find_resource(service)41 if not resources:42 # TODO(yfried): replace with a better exception43 raise Exception("resource {0} NotFound".44 format(service))45 if len(resources) > 1:46 # TODO(yfried): replace with a better exception47 raise Exception("Found multiple matches for resource {s}".48 format(service))49 resource = resources.pop()50 return self.get_resource_node(resource)51 def get_vip_dest(self, proj):52 vip = "-".join(["ip", proj, "adm"])53 vips = self.find_resource(vip, exact_match=False)54 if not vips:55 # todo(yfried) better exception56 raise Exception("No vip found for: {proj}. Unable to find vip "57 "named: {vip}".format(proj=proj, vip=vip))58 vip_resource = vips.pop()59 return self.get_resource_node(vip_resource)60 @staticmethod61 def strip_node_name(name):62 prefix = "pcmk-"63 assert name.startswith(prefix)64 return name[len(prefix):]65 @staticmethod66 def _find_in_tree(root, tag, id, exact=True):67 match = "__eq__" if exact else "__contains__"68 return [r for r in root.getElementsByTagName(tag)69 if getattr(r.getAttribute("id"), match)(id)]70 def find_clone(self, service):71 TAG = "clone"72 service = "%s-clone" % service73 x_list = self._find_in_tree(self.cluster, TAG, service)74 if len(x_list) > 1:75 # TODO(yfried): replace with a better exception76 raise Exception("Found multiple matches for clone {s}".77 format(service))78 return x_list79 def get_active_resources(self, clone):80 """Returns list of active resource for :clone81 list is of len=1 if clone is A/P, if clone is A/A will return all82 resource of this clone83 :param clone:84 :return:85 """86 # get service name from clone name87 service = clone.getAttribute("id")[:-len("-clone")]88 resources = self._find_in_tree(clone, "resource", service)89 def active(resource):90 return resource.getAttribute("active") == 'true' and \91 resource.getAttribute("role") == 'Started'92 return [r for r in resources if active(r)]93 def find_resource(self, resource_id, exact_match=True):94 """95 :param resource_id: name of the resource96 :param exact_match: if False - return any resource whose name97 contains the :resource_id98 :return:99 """100 TAG = "resource"101 match = "exact " if exact_match else ""102 LOG.debug("find " + match + "match for resource %s" % resource_id)103 x_list = self._find_in_tree(self.cluster, TAG, resource_id,104 exact=exact_match)105 return x_list106 # @ssh_cmds.cli_choice(parser="pcs", handler="action")107 # def cluster_nodes(self):108 # out = self.cluster.get("nodes")["node"]...

Full Screen

Full Screen

ssh_cmds.py

Source:ssh_cmds.py Github

copy

Full Screen

...38 """39 assert output == ""40 return True41 @staticmethod42 def _noop_parser(out):43 """don't parse """44 return out45# def parser_exec(parser):46# main.PARSERS[parser]["func"]47# def decorator(c):48# @functools.wraps(c)49# def class_dec(*args, **kwargs):50# return c(*args, **kwargs)51# return class_dec52# return decorator53@parsers.cli_command("raw", subparser="action")54class RAWcmd(tmp_cmd):55 @parsers.add_argument(dest="input", nargs="*", default=None)56 @parsers.cli_choice(parser="raw", subparser="action")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful