Best Python code snippet using localstack_python
parser.py
Source:parser.py  
...197        if value == "false":198            return False199        raise ValueError("cannot parse boolean value %s" % node)200    @_text_content201    def _noop_parser(self, _, __, node: any):202        return node203    _parse_character = _parse_string = _noop_parser204    _parse_double = _parse_float205    _parse_long = _parse_integer206    def _convert_str_to_timestamp(self, value: str, timestamp_format=None):207        if timestamp_format is None:208            timestamp_format = self.TIMESTAMP_FORMAT209        timestamp_format = timestamp_format.lower()210        converter = getattr(self, "_timestamp_%s" % timestamp_format)211        final_value = converter(value)212        return final_value213    @staticmethod214    def _timestamp_iso8601(date_string: str) -> datetime.datetime:215        return dateutil.parser.isoparse(date_string)216    @staticmethod217    def _timestamp_unixtimestamp(timestamp_string: str) -> datetime.datetime:218        return datetime.datetime.utcfromtimestamp(int(timestamp_string))219    @staticmethod220    def _timestamp_rfc822(datetime_string: str) -> datetime.datetime:221        return parsedate_to_datetime(datetime_string)222class QueryRequestParser(RequestParser):223    """224    The ``QueryRequestParser`` is responsible for parsing incoming requests for services which use the ``query``225    protocol. The requests for these services encode the majority of their parameters in the URL query string.226    **Experimental:** This parser is still experimental.227    When implementing services with this parser, some edge cases might not work out-of-the-box.228    """229    # Prefix for non-flattened lists230    NON_FLATTENED_LIST_PREFIX = "member."231    def parse(self, request: HttpRequest) -> Tuple[OperationModel, Any]:232        body = to_str(request["body"])233        instance = parse_qs(body, keep_blank_values=True)234        # The query parsing returns a list for each entry in the dict (this is how HTTP handles lists in query params).235        # However, the AWS Query format does not have any duplicates.236        # Therefore we take the first element of each entry in the dict.237        instance = {k: self._get_first(v) for k, v in instance.items()}238        operation: OperationModel = self.service.operation_model(instance["Action"])239        input_shape: StructureShape = operation.input_shape240        return operation, self._parse_shape(request, input_shape, instance)241    def _process_member(242        self, request: HttpRequest, member_name: str, member_shape: Shape, node: dict243    ):244        if isinstance(member_shape, (MapShape, ListShape, StructureShape)):245            # If we have a complex type, we filter the node and change it's keys to craft a new "context" for the246            # new hierarchy level247            sub_node = self._filter_node(member_name, node)248        else:249            # If it is a primitive type we just get the value from the dict250            sub_node = node.get(member_name)251        # The filtered node is processed and returned (or None if the sub_node is None)252        return self._parse_shape(request, member_shape, sub_node) if sub_node is not None else None253    def _parse_structure(self, request: HttpRequest, shape: StructureShape, node: dict) -> dict:254        result = dict()255        for member, member_shape in shape.members.items():256            # The key in the node is either the serialization config "name" of the shape, or the name of the member257            member_name = self._get_serialized_name(member_shape, member)258            # BUT, if it's flattened and a list, the name is defined by the list's member's name259            if member_shape.serialization.get("flattened"):260                if isinstance(member_shape, ListShape):261                    member_name = self._get_serialized_name(member_shape.member, member)262            value = self._process_member(request, member_name, member_shape, node)263            if value is not None or member in shape.required_members:264                # If the member is required, but not existing, we explicitly set None265                result[member] = value266        return result if len(result) > 0 else None267    def _parse_map(self, request: HttpRequest, shape: MapShape, node: dict) -> dict:268        """269        This is what the node looks like for a flattened map::270        ::271          {272              "Attribute.1.Name": "MyKey",273              "Attribute.1.Value": "MyValue",274              "Attribute.2.Name": ...,275              ...276          }277        ::278        This function expects an already filtered / pre-processed node. The node dict would therefore look like:279        ::280          {281              "1.Name": "MyKey",282              "1.Value": "MyValue",283              "2.Name": ...284          }285        ::286        """287        key_prefix = ""288        # Non-flattened maps have an additional hierarchy level named "entry"289        # https://awslabs.github.io/smithy/1.0/spec/core/xml-traits.html#xmlflattened-trait290        if not shape.serialization.get("flattened"):291            key_prefix += "entry."292        result = dict()293        i = 0294        while True:295            i += 1296            # The key and value can be renamed (with their serialization config's "name").297            # By default they are called "key" and "value".298            key_name = f"{key_prefix}{i}.{self._get_serialized_name(shape.key, 'key')}"299            value_name = f"{key_prefix}{i}.{self._get_serialized_name(shape.value, 'value')}"300            # We process the key and value individually301            k = self._process_member(request, key_name, shape.key, node)302            v = self._process_member(request, value_name, shape.value, node)303            if k is None or v is None:304                # technically, if one exists but not the other, then that would be an invalid request305                break306            result[k] = v307        return result if len(result) > 0 else None308    def _parse_list(self, request: HttpRequest, shape: ListShape, node: dict) -> list:309        """310        Some actions take lists of parameters. These lists are specified using the param.[member.]n notation.311        The "member" is used if the list is not flattened.312        Values of n are integers starting from 1.313        For example, a list with two elements looks like this:314        - Flattened: &AttributeName.1=first&AttributeName.2=second315        - Non-flattened: &AttributeName.member.1=first&AttributeName.member.2=second316        This function expects an already filtered / processed node. The node dict would therefore look like:317        ::318          {319              "1": "first",320              "2": "second",321              "3": ...322          }323        ::324        """325        key_prefix = ""326        # Non-flattened lists have an additional hierarchy level named "member"327        # https://awslabs.github.io/smithy/1.0/spec/core/xml-traits.html#xmlflattened-trait328        if not shape.serialization.get("flattened"):329            key_prefix += self.NON_FLATTENED_LIST_PREFIX330        # We collect the list value as well as the integer indicating the list position so we can331        # later sort the list by the position, in case they attribute values are unordered332        result: List[Tuple[int, Any]] = list()333        i = 0334        while True:335            i += 1336            key_name = f"{key_prefix}{i}"337            value = self._process_member(request, key_name, shape.member, node)338            if value is None:339                break340            result.append((i, value))341        return [r[1] for r in sorted(result)] if len(result) > 0 else None342    @staticmethod343    def _get_first(node: any) -> any:344        if isinstance(node, (list, tuple)):345            return node[0]346        return node347    @staticmethod348    def _filter_node(name: str, node: dict) -> dict:349        """Filters the node dict for entries where the key starts with the given name."""350        filtered = {k[len(name) + 1 :]: v for k, v in node.items() if k.startswith(name)}351        return filtered if len(filtered) > 0 else None352    def _get_serialized_name(self, shape: Shape, default_name: str) -> str:353        """354        Returns the serialized name for the shape if it exists.355        Otherwise it will return the given default_name.356        """357        return shape.serialization.get("name", default_name)358class BaseRestRequestParser(RequestParser):359    """360    The ``BaseRestRequestParser`` is the base class for all "resty" AWS service protocols.361    The operation which should be invoked is determined based on the HTTP method and the path suffix.362    The body encoding is done in the respective subclasses.363    """364    def __init__(self, service: ServiceModel) -> None:365        super().__init__(service)366        # When parsing a request, we need to lookup the operation based on the HTTP method and URI.367        # Therefore we create a mapping when the parser is initialized.368        self.operation_lookup = defaultdict(lambda: defaultdict(OperationModel))369        for operation in service.operation_names:370            operation_model = service.operation_model(operation)371            http = operation_model.http372            if len(http) > 0:373                method = http.get("method")374                request_uri = http.get("requestUri")375                self.operation_lookup[method][request_uri] = operation_model376    def parse(self, request: HttpRequest) -> Tuple[OperationModel, Any]:377        operation = self.operation_lookup[request["method"]][request["path"]]378        shape: StructureShape = operation.input_shape379        final_parsed = {}380        if shape is not None:381            member_shapes = shape.members382            self._parse_non_payload_attrs(request, member_shapes, final_parsed)383            self._parse_payload(request, shape, member_shapes, final_parsed)384        return operation, final_parsed385    def _parse_payload(386        self,387        request: HttpRequest,388        shape: Shape,389        member_shapes: Dict[str, Shape],390        final_parsed: dict,391    ) -> None:392        """Parses all attributes which are located in the payload / body of the incoming request."""393        if "payload" in shape.serialization:394            # If a payload is specified in the output shape, then only that shape is used for the body payload.395            payload_member_name = shape.serialization["payload"]396            body_shape = member_shapes[payload_member_name]397            if body_shape.serialization.get("eventstream"):398                body = self._create_event_stream(request, body_shape)399                final_parsed[payload_member_name] = body400            elif body_shape.type_name in ["string", "blob"]:401                # This is a stream402                body = request["body"]403                if isinstance(body, bytes):404                    body = body.decode(self.DEFAULT_ENCODING)405                final_parsed[payload_member_name] = body406            else:407                original_parsed = self._initial_body_parse(request["body"])408                final_parsed[payload_member_name] = self._parse_shape(409                    request, body_shape, original_parsed410                )411        else:412            original_parsed = self._initial_body_parse(request["body"])413            body_parsed = self._parse_shape(request, shape, original_parsed)414            final_parsed.update(body_parsed)415    def _parse_non_payload_attrs(416        self, request: HttpRequest, member_shapes: dict, final_parsed: dict417    ) -> None:418        """Parses all attributes which are not located in the payload."""419        headers = request["headers"]420        for name in member_shapes:421            member_shape = member_shapes[name]422            location = member_shape.serialization.get("location")423            if location is None:424                continue425            elif location == "headers":426                final_parsed[name] = self._parse_header_map(member_shape, headers)427            elif location == "header":428                header_name = member_shape.serialization.get("name", name)429                if header_name in headers:430                    final_parsed[name] = self._parse_shape(431                        request, member_shape, headers[header_name]432                    )433            else:434                raise RequestParserError("Unknown shape location '%s'." % location)435    @staticmethod436    def _parse_header_map(shape: Shape, headers: dict) -> dict:437        # Note that headers are case insensitive, so we .lower() all header names and header prefixes.438        parsed = {}439        prefix = shape.serialization.get("name", "").lower()440        for header_name in headers:441            if header_name.lower().startswith(prefix):442                # The key name inserted into the parsed hash strips off the prefix.443                name = header_name[len(prefix) :]444                parsed[name] = headers[header_name]445        return parsed446    def _initial_body_parse(self, body_contents: bytes) -> any:447        """448        This method executes the initial XML or JSON parsing of the body.449        The parsed body will afterwards still be walked through and the nodes will be converted to the appropriate450        types, but this method does the first round of parsing.451        :param body_contents: which should be parsed452        :return: depending on the actual implementation453        """454        raise NotImplementedError("_initial_body_parse")455    def _create_event_stream(self, request: dict, shape: Shape) -> any:456        # TODO handle event streams457        raise NotImplementedError("_create_event_stream")458class RestXMLRequestParser(BaseRestRequestParser):459    """460    The ``RestXMLRequestParser`` is responsible for parsing incoming requests for services which use the ``rest-xml``461    protocol. The requests for these services encode the majority of their parameters as XML in the request body.462    **Experimental:** This parser is still experimental.463    When implementing services with this parser, some edge cases might not work out-of-the-box.464    """465    def __init__(self, service_model: ServiceModel):466        super(RestXMLRequestParser, self).__init__(service_model)467        self._namespace_re = re.compile("{.*}")468    def _initial_body_parse(self, xml_string: str) -> ETree.Element:469        if not xml_string:470            return ETree.Element("")471        return self._parse_xml_string_to_dom(xml_string)472    def _parse_structure(473        self, request: HttpRequest, shape: StructureShape, node: ETree.Element474    ) -> dict:475        parsed = {}476        xml_dict = self._build_name_to_xml_node(node)477        for member_name, member_shape in shape.members.items():478            if "location" in member_shape.serialization or member_shape.serialization.get(479                "eventheader"480            ):481                # All members with locations have already been handled in ``_parse_non_payload_attrs``,482                # so we don't need to parse these members.483                continue484            xml_name = self._member_key_name(member_shape, member_name)485            member_node = xml_dict.get(xml_name)486            if member_node is not None:487                parsed[member_name] = self._parse_shape(request, member_shape, member_node)488            elif member_shape.serialization.get("xmlAttribute"):489                attributes = {}490                location_name = member_shape.serialization["name"]491                for key, value in node.attrib.items():492                    new_key = self._namespace_re.sub(location_name.split(":")[0] + ":", key)493                    attributes[new_key] = value494                if location_name in attributes:495                    parsed[member_name] = attributes[location_name]496            elif member_name in shape.required_members:497                # If the member is required, but not existing, we explicitly set None498                parsed[member_name] = None499        return parsed500    def _parse_map(self, request: HttpRequest, shape: MapShape, node: dict) -> dict:501        parsed = {}502        key_shape = shape.key503        value_shape = shape.value504        key_location_name = key_shape.serialization.get("name", "key")505        value_location_name = value_shape.serialization.get("name", "value")506        if shape.serialization.get("flattened") and not isinstance(node, list):507            node = [node]508        for keyval_node in node:509            key_name = val_name = None510            for single_pair in keyval_node:511                # Within each <entry> there's a <key> and a <value>512                tag_name = self._node_tag(single_pair)513                if tag_name == key_location_name:514                    key_name = self._parse_shape(request, key_shape, single_pair)515                elif tag_name == value_location_name:516                    val_name = self._parse_shape(request, value_shape, single_pair)517                else:518                    raise RequestParserError("Unknown tag: %s" % tag_name)519            parsed[key_name] = val_name520        return parsed521    def _parse_list(self, request: HttpRequest, shape: ListShape, node: dict) -> list:522        # When we use _build_name_to_xml_node, repeated elements are aggregated523        # into a list. However, we can't tell the difference between a scalar524        # value and a single element flattened list. So before calling the525        # real _handle_list, we know that "node" should actually be a list if526        # it's flattened, and if it's not, then we make it a one element list.527        if shape.serialization.get("flattened") and not isinstance(node, list):528            node = [node]529        return super(RestXMLRequestParser, self)._parse_list(request, shape, node)530    def _node_tag(self, node: ETree.Element) -> str:531        return self._namespace_re.sub("", node.tag)532    @staticmethod533    def _member_key_name(shape: Shape, member_name: str) -> str:534        # This method is needed because we have to special case flattened list535        # with a serialization name.  If this is the case we use the536        # locationName from the list's member shape as the key name for the537        # surrounding structure.538        if isinstance(shape, ListShape) and shape.serialization.get("flattened"):539            list_member_serialized_name = shape.member.serialization.get("name")540            if list_member_serialized_name is not None:541                return list_member_serialized_name542        serialized_name = shape.serialization.get("name")543        if serialized_name is not None:544            return serialized_name545        return member_name546    def _parse_xml_string_to_dom(self, xml_string: str) -> ETree.Element:547        try:548            parser = ETree.XMLParser(target=ETree.TreeBuilder(), encoding=self.DEFAULT_ENCODING)549            parser.feed(xml_string)550            root = parser.close()551        except ETree.ParseError as e:552            raise RequestParserError(553                "Unable to parse request (%s), invalid XML received:\n%s" % (e, xml_string)554            )555        return root556    def _build_name_to_xml_node(self, parent_node: Union[list, ETree.Element]) -> dict:557        # If the parent node is actually a list. We should not be trying558        # to serialize it to a dictionary. Instead, return the first element559        # in the list.560        if isinstance(parent_node, list):561            return self._build_name_to_xml_node(parent_node[0])562        xml_dict = {}563        for item in parent_node:564            key = self._node_tag(item)565            if key in xml_dict:566                # If the key already exists, the most natural567                # way to handle this is to aggregate repeated568                # keys into a single list.569                # <foo>1</foo><foo>2</foo> -> {'foo': [Node(1), Node(2)]}570                if isinstance(xml_dict[key], list):571                    xml_dict[key].append(item)572                else:573                    # Convert from a scalar to a list.574                    xml_dict[key] = [xml_dict[key], item]575            else:576                xml_dict[key] = item577        return xml_dict578    def _create_event_stream(self, request: dict, shape: Shape) -> any:579        # TODO handle event streams580        raise NotImplementedError("_create_event_stream")581class BaseJSONRequestParser(RequestParser, ABC):582    """583    The ``BaseJSONRequestParser`` is the base class for all JSON-based AWS service protocols.584    This base-class handles parsing the payload / body as JSON.585    """586    TIMESTAMP_FORMAT = "unixtimestamp"587    def _parse_structure(588        self, request: HttpRequest, shape: StructureShape, value: Optional[dict]589    ) -> Optional[dict]:590        if shape.is_document_type:591            final_parsed = value592        else:593            if value is None:594                # If the comes across the wire as "null" (None in python),595                # we should be returning this unchanged, instead of as an596                # empty dict.597                return None598            final_parsed = {}599            for member_name, member_shape in shape.members.items():600                json_name = member_shape.serialization.get("name", member_name)601                raw_value = value.get(json_name)602                if raw_value is not None:603                    final_parsed[member_name] = self._parse_shape(request, member_shape, raw_value)604                elif member_name in shape.required_members:605                    # If the member is required, but not existing, we explicitly set None606                    final_parsed[member_name] = None607        return final_parsed608    def _parse_map(609        self, request: HttpRequest, shape: MapShape, value: Optional[dict]610    ) -> Optional[dict]:611        if value is None:612            return None613        parsed = {}614        key_shape = shape.key615        value_shape = shape.value616        for key, value in value.items():617            actual_key = self._parse_shape(request, key_shape, key)618            actual_value = self._parse_shape(request, value_shape, value)619            parsed[actual_key] = actual_value620        return parsed621    def _parse_body_as_json(self, body_contents: bytes) -> dict:622        if not body_contents:623            return {}624        body = body_contents.decode(self.DEFAULT_ENCODING)625        try:626            original_parsed = json.loads(body)627            return original_parsed628        except ValueError:629            raise RequestParserError("HTTP body could not be parsed as JSON.")630    def _parse_boolean(self, request: HttpRequest, shape: Shape, node: bool) -> bool:631        return super()._noop_parser(request, shape, node)632class JSONRequestParser(BaseJSONRequestParser):633    """634    The ``JSONRequestParser`` is responsible for parsing incoming requests for services which use the ``json``635    protocol.636    The requests for these services encode the majority of their parameters as JSON in the request body.637    The operation is defined in an HTTP header field.638    **Experimental:** This parser is still experimental.639    When implementing services with this parser, some edge cases might not work out-of-the-box.640    """641    def parse(self, request: HttpRequest) -> Tuple[OperationModel, Any]:642        target = request["headers"]["X-Amz-Target"]643        _, operation_name = target.split(".")644        operation = self.service.operation_model(operation_name)645        shape = operation.input_shape...pcs.py
Source:pcs.py  
1import collections2import json3from eventool import ssh_cmds4from eventool import logger5import xmltodict6# from lxml import etree as xml_parser7from xml.dom import minidom as xml_parser8from eventool import parsers9LOG = logger.getLogger(__name__)10@parsers.cli_command("pcs", subparser="action")11class PCSMgmt(ssh_cmds.tmp_cmd):12    def __init__(self, ssh):13        super(PCSMgmt, self).__init__(ssh)14        self._dict_xml = None15        self._cluster = None16        # self._haproxy_conf = None17    @property18    def cluster(self):19        self._cluster = self._cluster or self.status_xml()20        return self._cluster21    @parsers.cli_choice(parser="pcs", subparser="action")22    @ssh_cmds.command_decorator23    def status(self):24        """Returns full PCS status. """25        cmd = "pcs status"26        return cmd, self._noop_parser27    @ssh_cmds.command_decorator28    def status_xml(self):29        LOG.debug("Get PCS data in XML format from HA node")30        cmd = "crm_mon -X"31        return cmd, self._parse_xml32    def _parse_xml(self, raw_xml):33        self._dict_xml = xmltodict.parse(raw_xml)34        # return raw_xml35        return xml_parser.parseString(raw_xml)36    def get_resource_node(self, resource):37        [node] = resource.getElementsByTagName("node")38        return node.getAttribute("name")39    def find_service_node(self, service):40        resources = self.find_resource(service)41        if not resources:42            # TODO(yfried): replace with a better exception43            raise Exception("resource {0} NotFound".44                            format(service))45        if len(resources) > 1:46            # TODO(yfried): replace with a better exception47            raise Exception("Found multiple matches for resource {s}".48                            format(service))49        resource = resources.pop()50        return self.get_resource_node(resource)51    def get_vip_dest(self, proj):52        vip = "-".join(["ip", proj, "adm"])53        vips = self.find_resource(vip, exact_match=False)54        if not vips:55            # todo(yfried) better exception56            raise Exception("No vip found for: {proj}. Unable to find vip "57                            "named: {vip}".format(proj=proj, vip=vip))58        vip_resource = vips.pop()59        return self.get_resource_node(vip_resource)60    @staticmethod61    def strip_node_name(name):62        prefix = "pcmk-"63        assert name.startswith(prefix)64        return name[len(prefix):]65    @staticmethod66    def _find_in_tree(root, tag, id, exact=True):67        match = "__eq__" if exact else "__contains__"68        return [r for r in root.getElementsByTagName(tag)69                if getattr(r.getAttribute("id"), match)(id)]70    def find_clone(self, service):71        TAG = "clone"72        service = "%s-clone" % service73        x_list = self._find_in_tree(self.cluster, TAG, service)74        if len(x_list) > 1:75            # TODO(yfried): replace with a better exception76            raise Exception("Found multiple matches for clone {s}".77                            format(service))78        return x_list79    def get_active_resources(self, clone):80        """Returns list of active resource for :clone81        list is of len=1 if clone is A/P, if clone is A/A will return all82        resource of this clone83        :param clone:84        :return:85        """86        # get service name from clone name87        service = clone.getAttribute("id")[:-len("-clone")]88        resources = self._find_in_tree(clone, "resource", service)89        def active(resource):90            return resource.getAttribute("active") == 'true' and \91                   resource.getAttribute("role") == 'Started'92        return [r for r in resources if active(r)]93    def find_resource(self, resource_id, exact_match=True):94        """95        :param resource_id: name of the resource96        :param exact_match: if False - return any resource whose name97        contains the :resource_id98        :return:99        """100        TAG = "resource"101        match = "exact " if exact_match else ""102        LOG.debug("find " + match + "match for resource %s" % resource_id)103        x_list = self._find_in_tree(self.cluster, TAG, resource_id,104                                    exact=exact_match)105        return x_list106    # @ssh_cmds.cli_choice(parser="pcs", handler="action")107    # def cluster_nodes(self):108    #     out = self.cluster.get("nodes")["node"]...ssh_cmds.py
Source:ssh_cmds.py  
...38        """39        assert output == ""40        return True41    @staticmethod42    def _noop_parser(out):43        """don't parse """44        return out45# def parser_exec(parser):46#     main.PARSERS[parser]["func"]47#     def decorator(c):48#         @functools.wraps(c)49#         def class_dec(*args, **kwargs):50#             return c(*args, **kwargs)51#         return class_dec52#     return decorator53@parsers.cli_command("raw", subparser="action")54class RAWcmd(tmp_cmd):55    @parsers.add_argument(dest="input", nargs="*", default=None)56    @parsers.cli_choice(parser="raw", subparser="action")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
