Best Python code snippet using localstack_python
sqs_listener.py
Source:sqs_listener.py  
...65        if key not in req_data:66            break67        result.add(req_data[key])68    return result69def _get_attributes_forward_request(method, path, headers, req_data, forward_attrs):70    req_data_new = {k: v for k, v in req_data.items() if not k.startswith("Attribute.")}71    i = 172    for k, v in forward_attrs.items():73        req_data_new["Attribute.%s.Name" % i] = [k]74        req_data_new["Attribute.%s.Value" % i] = [v]75        i += 176    data = urlencode(req_data_new, doseq=True)77    return Request(data=data, headers=headers, method=method)78def _set_queue_attributes(queue_url, req_data):79    # TODO remove this function if we stop using ElasticMQ entirely80    if SQS_BACKEND_IMPL != "elasticmq":81        return82    attrs = _format_attributes(req_data)83    # select only the attributes in UNSUPPORTED_ATTRIBUTE_NAMES84    local_attrs = {}85    for k, v in attrs.items():86        if k in UNSUPPORTED_ATTRIBUTE_NAMES:87            try:88                _v = json.loads(v)89                if isinstance(_v, dict):90                    if "maxReceiveCount" in _v:91                        _v["maxReceiveCount"] = int(_v["maxReceiveCount"])92                local_attrs.update(dict({k: json.dumps(_v)}))93            except Exception:94                local_attrs.update(dict({k: v}))95    QUEUE_ATTRIBUTES[queue_url] = QUEUE_ATTRIBUTES.get(queue_url) or {}96    QUEUE_ATTRIBUTES[queue_url].update(local_attrs)97    forward_attrs = {k: v for k, v in attrs.items() if k not in UNSUPPORTED_ATTRIBUTE_NAMES}98    return forward_attrs99def _fix_dlq_arn_in_attributes(req_data):100    """Convert queue URL to ARN for DLQ in redrive policy config."""101    attrs = _format_attributes(req_data)102    policy = json.loads(attrs.get("RedrivePolicy") or "{}")103    dlq_arn = policy.get("deadLetterTargetArn", "")104    if "://" in dlq_arn:105        # convert queue URL to queue ARN106        policy["deadLetterTargetArn"] = aws_stack.sqs_queue_arn(dlq_arn)107        attrs["RedrivePolicy"] = json.dumps(policy)108        return attrs109def _fix_redrive_policy(match):110    result = "<Attribute><Name>RedrivePolicy</Name><Value>{%s}</Value></Attribute>" % (111        match.group(1).replace(" ", "")112    )113    return result114def _add_queue_attributes(path, req_data, content_str, headers):115    # TODO remove this function if we stop using ElasticMQ entirely116    if SQS_BACKEND_IMPL != "elasticmq":117        return content_str118    flags = re.MULTILINE | re.DOTALL119    queue_url = _queue_url(path, req_data, headers)120    requested_attributes = _format_attributes_names(req_data)121    regex = r"(.*<GetQueueAttributesResult>)(.*)(</GetQueueAttributesResult>.*)"122    attrs = re.sub(regex, r"\2", content_str, flags=flags)123    for key, value in QUEUE_ATTRIBUTES.get(queue_url, {}).items():124        if (125            not requested_attributes or requested_attributes.intersection({"All", key})126        ) and not re.match(r"<Name>\s*%s\s*</Name>" % key, attrs, flags=flags):127            attrs += "<Attribute><Name>%s</Name><Value>%s</Value></Attribute>" % (128                key,129                value,130            )131    content_str = (132        re.sub(regex, r"\1", content_str, flags=flags)133        + attrs134        + re.sub(regex, r"\3", content_str, flags=flags)135    )136    return content_str137def _fire_event(req_data, response):138    action = req_data.get("Action")139    event_type = None140    queue_url = None141    if action == "CreateQueue":142        event_type = event_publisher.EVENT_SQS_CREATE_QUEUE143        response_data = xmltodict.parse(response.content)144        if "CreateQueueResponse" in response_data:145            queue_url = response_data["CreateQueueResponse"]["CreateQueueResult"]["QueueUrl"]146    elif action == "DeleteQueue":147        event_type = event_publisher.EVENT_SQS_DELETE_QUEUE148        queue_url = req_data.get("QueueUrl")149    if event_type and queue_url:150        event_publisher.fire_event(event_type, payload={"u": event_publisher.get_hash(queue_url)})151def _queue_url(path, req_data, headers):152    queue_url = req_data.get("QueueUrl")153    if queue_url:154        return queue_url155    url = config.service_url("sqs")156    if headers.get("Host"):157        url = "%s://%s" % (get_service_protocol(), headers["Host"])158    queue_url = "%s%s" % (url, path.partition("?")[0])159    return queue_url160def _list_dead_letter_source_queues(queues, queue_url):161    dead_letter_source_queues = []162    for k, v in queues.items():163        for i, j in v.items():164            if i == "RedrivePolicy":165                f = json.loads(v[i])166                queue_url_split = queue_url.split("/")167                if queue_url_split[-1] in f["deadLetterTargetArn"]:168                    dead_letter_source_queues.append(k)169    return format_list_dl_source_queues_response(dead_letter_source_queues)170def format_list_dl_source_queues_response(queues):171    content_str = """<ListDeadLetterSourceQueuesResponse xmlns="{}">172                        <ListDeadLetterSourceQueuesResult>173                        {}174                        </ListDeadLetterSourceQueuesResult>175                    </ListDeadLetterSourceQueuesResponse>"""176    queue_urls = ""177    for q in queues:178        queue_urls += "<QueueUrl>{}</QueueUrl>".format(q)179    return content_str.format(XMLNS_SQS, queue_urls)180# extract the external port used by the client to make the request181def get_external_port(headers):182    host = headers.get("Host", "")183    if not host:184        forwarded = headers.get("X-Forwarded-For", "").split(",")185        host = forwarded[-2] if len(forwarded) > 2 else forwarded[-1]186    if ":" in host:187        return int(host.split(":")[1])188    # If we cannot find the Host header, then fall back to the port of SQS itself (i.e., edge proxy).189    # (Note that this could be incorrect, e.g., if running in Docker with a host port that190    #  is different from the internal container port, but there is not much else we can do.)191    return config.service_port("sqs")192def validate_empty_message_batch(data, req_data):193    data = to_str(data).split("Entries=")194    if len(data) > 1 and not req_data.get("Entries"):195        return True196    return False197class ProxyListenerSQS(PersistingProxyListener):198    def api_name(self):199        return "sqs"200    def forward_request(self, method, path, data, headers):201        if method == "OPTIONS":202            return 200203        req_data = parse_request_data(method, path, data)204        if is_sqs_queue_url(path) and method == "GET":205            if not headers.get("Authorization"):206                headers["Authorization"] = aws_stack.mock_aws_request_headers(service="sqs")[207                    "Authorization"208                ]209            method = "POST"210            req_data = {211                "Action": "GetQueueUrl",212                "Version": API_VERSION,213                "QueueName": path.split("/")[-1],214            }215        if req_data:216            action = req_data.get("Action")217            if action in ("SendMessage", "SendMessageBatch") and SQS_BACKEND_IMPL == "moto":218                # check message contents219                for key, value in req_data.items():220                    if not re.match(MSG_CONTENT_REGEX, str(value)):221                        return make_requests_error(222                            code=400,223                            code_string="InvalidMessageContents",224                            message="Message contains invalid characters",225                        )226            elif action == "SetQueueAttributes":227                # TODO remove this function if we stop using ElasticMQ228                queue_url = _queue_url(path, req_data, headers)229                if SQS_BACKEND_IMPL == "elasticmq":230                    forward_attrs = _set_queue_attributes(queue_url, req_data)231                    if len(req_data) != len(forward_attrs):232                        # make sure we only forward the supported attributes to the backend233                        return _get_attributes_forward_request(234                            method, path, headers, req_data, forward_attrs235                        )236            elif action == "TagQueue":237                req_data = self.fix_missing_tag_values(req_data)238            elif action == "CreateQueue":239                req_data = self.fix_missing_tag_values(req_data)240                def _is_fifo():241                    for k, v in req_data.items():242                        if v == "FifoQueue":243                            return req_data[k.replace("Name", "Value")].lower() == "true"244                    return False245                if req_data.get("QueueName").endswith(".fifo") and not _is_fifo():246                    LOG.warn(247                        'You are trying to create a queue ending in ".fifo".  Please use the --attributes parameter to set FifoQueue appropriately.'248                    )249                    msg = "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"250                    return make_requests_error(251                        code=400, code_string="InvalidParameterValue", message=msg252                    )253                changed_attrs = _fix_dlq_arn_in_attributes(req_data)254                if changed_attrs:255                    return _get_attributes_forward_request(256                        method, path, headers, req_data, changed_attrs257                    )258            elif action == "DeleteQueue":259                queue_url = _queue_url(path, req_data, headers)260                QUEUE_ATTRIBUTES.pop(queue_url, None)261                sns_listener.unsubscribe_sqs_queue(queue_url)262            elif action == "ListDeadLetterSourceQueues":263                # TODO remove this function if we stop using ElasticMQ entirely264                queue_url = _queue_url(path, req_data, headers)265                if SQS_BACKEND_IMPL == "elasticmq":266                    headers = {"content-type": "application/xhtml+xml"}267                    content_str = _list_dead_letter_source_queues(QUEUE_ATTRIBUTES, queue_url)268                    return requests_response(content_str, headers=headers)269            if "QueueName" in req_data:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
