Best Python code snippet using localstack_python
sqs_listener.py
Source:sqs_listener.py  
...217            if action in ("SendMessage", "SendMessageBatch") and SQS_BACKEND_IMPL == "moto":218                # check message contents219                for key, value in req_data.items():220                    if not re.match(MSG_CONTENT_REGEX, str(value)):221                        return make_requests_error(222                            code=400,223                            code_string="InvalidMessageContents",224                            message="Message contains invalid characters",225                        )226            elif action == "SetQueueAttributes":227                # TODO remove this function if we stop using ElasticMQ228                queue_url = _queue_url(path, req_data, headers)229                if SQS_BACKEND_IMPL == "elasticmq":230                    forward_attrs = _set_queue_attributes(queue_url, req_data)231                    if len(req_data) != len(forward_attrs):232                        # make sure we only forward the supported attributes to the backend233                        return _get_attributes_forward_request(234                            method, path, headers, req_data, forward_attrs235                        )236            elif action == "TagQueue":237                req_data = self.fix_missing_tag_values(req_data)238            elif action == "CreateQueue":239                req_data = self.fix_missing_tag_values(req_data)240                def _is_fifo():241                    for k, v in req_data.items():242                        if v == "FifoQueue":243                            return req_data[k.replace("Name", "Value")].lower() == "true"244                    return False245                if req_data.get("QueueName").endswith(".fifo") and not _is_fifo():246                    LOG.warn(247                        'You are trying to create a queue ending in ".fifo".  Please use the --attributes parameter to set FifoQueue appropriately.'248                    )249                    msg = "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"250                    return make_requests_error(251                        code=400, code_string="InvalidParameterValue", message=msg252                    )253                changed_attrs = _fix_dlq_arn_in_attributes(req_data)254                if changed_attrs:255                    return _get_attributes_forward_request(256                        method, path, headers, req_data, changed_attrs257                    )258            elif action == "DeleteQueue":259                queue_url = _queue_url(path, req_data, headers)260                QUEUE_ATTRIBUTES.pop(queue_url, None)261                sns_listener.unsubscribe_sqs_queue(queue_url)262            elif action == "ListDeadLetterSourceQueues":263                # TODO remove this function if we stop using ElasticMQ entirely264                queue_url = _queue_url(path, req_data, headers)265                if SQS_BACKEND_IMPL == "elasticmq":266                    headers = {"content-type": "application/xhtml+xml"}267                    content_str = _list_dead_letter_source_queues(QUEUE_ATTRIBUTES, queue_url)268                    return requests_response(content_str, headers=headers)269            if "QueueName" in req_data:270                encoded_data = urlencode(req_data, doseq=True) if method == "POST" else ""271                modified_url = None272                if method == "GET":273                    base_path = path.partition("?")[0]274                    modified_url = "%s?%s" % (275                        base_path,276                        urlencode(req_data, doseq=True),277                    )278                return Request(data=encoded_data, url=modified_url, headers=headers, method=method)279        return True280    def return_response(self, method, path, data, headers, response):281        # persist requests to disk282        super(ProxyListenerSQS, self).return_response(method, path, data, headers, response)283        if method == "OPTIONS" and path == "/":284            # Allow CORS preflight requests to succeed.285            return 200286        if method != "POST":287            return288        region_name = aws_stack.get_region()289        req_data = parse_request_data(method, path, data)290        action = req_data.get("Action")291        content_str = content_str_original = to_str(response.content)292        if response.status_code >= 400:293            return response294        _fire_event(req_data, response)295        # patch the response and add missing attributes296        if action == "GetQueueAttributes":297            content_str = _add_queue_attributes(path, req_data, content_str, headers)298        name = r"<Name>\s*RedrivePolicy\s*<\/Name>"299        value = r"<Value>\s*{(.*)}\s*<\/Value>"300        for p1, p2 in ((name, value), (value, name)):301            content_str = re.sub(302                r"<Attribute>\s*%s\s*%s\s*<\/Attribute>" % (p1, p2),303                _fix_redrive_policy,304                content_str,305            )306        # patch the response and return the correct endpoint URLs / ARNs307        if action in (308            "CreateQueue",309            "GetQueueUrl",310            "ListQueues",311            "GetQueueAttributes",312            "ListDeadLetterSourceQueues",313        ):314            if config.USE_SSL and "<QueueUrl>http://" in content_str:315                # return https://... if we're supposed to use SSL316                content_str = re.sub(r"<QueueUrl>\s*http://", r"<QueueUrl>https://", content_str)317            # expose external hostname:port318            external_port = SQS_PORT_EXTERNAL or get_external_port(headers)319            content_str = re.sub(320                r"<QueueUrl>\s*([a-z]+)://[^<]*:([0-9]+)/([^<]*)\s*</QueueUrl>",321                r"<QueueUrl>\1://%s:%s/\3</QueueUrl>" % (config.HOSTNAME_EXTERNAL, external_port),322                content_str,323            )324            # encode account ID in queue URL325            content_str = re.sub(326                r"<QueueUrl>\s*([a-z]+)://([^/]+)/queue/([^<]*)\s*</QueueUrl>",327                r"<QueueUrl>\1://\2/%s/\3</QueueUrl>" % constants.TEST_AWS_ACCOUNT_ID,328                content_str,329            )330            # fix queue ARN331            content_str = re.sub(332                r"<([a-zA-Z0-9]+)>\s*arn:aws:sqs:elasticmq:([^<]+)</([a-zA-Z0-9]+)>",333                r"<\1>arn:aws:sqs:%s:\2</\3>" % region_name,334                content_str,335            )336            if action == "CreateQueue":337                regex = r".*<QueueUrl>(.*)</QueueUrl>"338                queue_url = re.match(regex, content_str, re.DOTALL).group(1)339                if SQS_BACKEND_IMPL == "elasticmq":340                    _set_queue_attributes(queue_url, req_data)341        elif action == "SendMessageBatch":342            if validate_empty_message_batch(data, req_data):343                msg = "There should be at least one SendMessageBatchRequestEntry in the request."344                return make_requests_error(code=404, code_string="EmptyBatchRequest", message=msg)345        if content_str_original != content_str:346            # if changes have been made, return patched response347            response.headers["Content-Length"] = len(content_str)348            response.headers["x-amz-crc32"] = calculate_crc32(content_str)349            return requests_response(350                content_str, headers=response.headers, status_code=response.status_code351            )352    @classmethod353    # TODO still needed? (can probably be removed)354    def get_message_attributes_md5(cls, req_data):355        req_data = clone(req_data)356        orig_types = {}357        for key, entry in dict(req_data).items():358            # Fix an issue in moto where data types like 'Number.java.lang.Integer' are...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
