Best Python code snippet using localstack_python
sqs_listener.py
Source:sqs_listener.py  
...62        req_data_new['Attribute.%s.Value' % i] = [v]63        i += 164    data = urlencode(req_data_new, doseq=True)65    return Request(data=data, headers=headers, method=method)66def _set_queue_attributes(queue_url, req_data):67    # TODO remove this function if we stop using ElasticMQ entirely68    if SQS_BACKEND_IMPL != 'elasticmq':69        return70    attrs = _format_attributes(req_data)71    # select only the attributes in UNSUPPORTED_ATTRIBUTE_NAMES72    local_attrs = {}73    for k, v in attrs.items():74        if k in UNSUPPORTED_ATTRIBUTE_NAMES:75            try:76                _v = json.loads(v)77                if isinstance(_v, dict):78                    if 'maxReceiveCount' in _v:79                        _v['maxReceiveCount'] = int(_v['maxReceiveCount'])80                local_attrs.update(dict({k: json.dumps(_v)}))81            except Exception:82                local_attrs.update(dict({k: v}))83    QUEUE_ATTRIBUTES[queue_url] = QUEUE_ATTRIBUTES.get(queue_url) or {}84    QUEUE_ATTRIBUTES[queue_url].update(local_attrs)85    forward_attrs = dict([(k, v) for k, v in attrs.items() if k not in UNSUPPORTED_ATTRIBUTE_NAMES])86    return forward_attrs87def _add_queue_attributes(path, req_data, content_str, headers):88    # TODO remove this function if we stop using ElasticMQ entirely89    if SQS_BACKEND_IMPL != 'elasticmq':90        return content_str91    flags = re.MULTILINE | re.DOTALL92    queue_url = _queue_url(path, req_data, headers)93    requested_attributes = _format_attributes_names(req_data)94    regex = r'(.*<GetQueueAttributesResult>)(.*)(</GetQueueAttributesResult>.*)'95    attrs = re.sub(regex, r'\2', content_str, flags=flags)96    for key, value in QUEUE_ATTRIBUTES.get(queue_url, {}).items():97        if (not requested_attributes or requested_attributes.intersection({'All', key})) and \98                not re.match(r'<Name>\s*%s\s*</Name>' % key, attrs, flags=flags):99            attrs += '<Attribute><Name>%s</Name><Value>%s</Value></Attribute>' % (key, value)100    content_str = (re.sub(regex, r'\1', content_str, flags=flags) +101                   attrs + re.sub(regex, r'\3', content_str, flags=flags))102    return content_str103def _fire_event(req_data, response):104    action = req_data.get('Action')105    event_type = None106    queue_url = None107    if action == 'CreateQueue':108        event_type = event_publisher.EVENT_SQS_CREATE_QUEUE109        response_data = xmltodict.parse(response.content)110        if 'CreateQueueResponse' in response_data:111            queue_url = response_data['CreateQueueResponse']['CreateQueueResult']['QueueUrl']112    elif action == 'DeleteQueue':113        event_type = event_publisher.EVENT_SQS_DELETE_QUEUE114        queue_url = req_data.get('QueueUrl')115    if event_type and queue_url:116        event_publisher.fire_event(event_type, payload={'u': event_publisher.get_hash(queue_url)})117def _queue_url(path, req_data, headers):118    queue_url = req_data.get('QueueUrl')119    if queue_url:120        return queue_url121    url = config.TEST_SQS_URL122    if headers.get('Host'):123        url = '%s://%s' % (get_service_protocol(), headers['Host'])124    queue_url = '%s%s' % (url, path.partition('?')[0])125    return queue_url126def _list_dead_letter_source_queues(queues, queue_url):127    dead_letter_source_queues = []128    for k, v in queues.items():129        for i, j in v.items():130            if i == 'RedrivePolicy':131                f = json.loads(v[i])132                queue_url_split = queue_url.split('/')133                if queue_url_split[-1] in f['deadLetterTargetArn']:134                    dead_letter_source_queues.append(k)135    return format_list_dl_source_queues_response(dead_letter_source_queues)136def _process_sent_message(path, req_data, headers):137    queue_name = _queue_url(path, req_data, headers).rpartition('/')[2]138    lambda_api.process_sqs_message(queue_name)139def format_list_dl_source_queues_response(queues):140    content_str = """<ListDeadLetterSourceQueuesResponse xmlns="{}">141                        <ListDeadLetterSourceQueuesResult>142                        {}143                        </ListDeadLetterSourceQueuesResult>144                    </ListDeadLetterSourceQueuesResponse>"""145    queue_urls = ''146    for q in queues:147        queue_urls += '<QueueUrl>{}</QueueUrl>'.format(q)148    return content_str.format(XMLNS_SQS, queue_urls)149# extract the external port used by the client to make the request150def get_external_port(headers, request_handler):151    host = headers.get('Host', '')152    if not host:153        forwarded = headers.get('X-Forwarded-For', '').split(',')154        host = forwarded[-2] if len(forwarded) > 2 else forwarded[-1]155    if ':' in host:156        return int(host.split(':')[1])157    if not request_handler or not request_handler.proxy:158        return config.PORT_SQS159    # If we cannot find the Host header, then fall back to the port of the proxy.160    # (note that this could be incorrect, e.g., if running in Docker with a host port that161    # is different from the internal container port, but there is not much else we can do.)162    return request_handler.proxy.port163def validate_empty_message_batch(data, req_data):164    data = to_str(data).split('Entries=')165    if len(data) > 1 and not req_data.get('Entries'):166        return True167    return False168def is_sqs_queue_url(url):169    path = path_from_url(url).partition('?')[0]170    return re.match(r'^/(queue|%s)/[a-zA-Z0-9_-]+$' % constants.TEST_AWS_ACCOUNT_ID, path)171class ProxyListenerSQS(PersistingProxyListener):172    def api_name(self):173        return 'sqs'174    def forward_request(self, method, path, data, headers):175        if method == 'OPTIONS':176            return 200177        req_data = parse_request_data(method, path, data)178        if is_sqs_queue_url(path) and method == 'GET':179            if not headers.get('Authorization'):180                headers['Authorization'] = aws_stack.mock_aws_request_headers(service='sqs')['Authorization']181            method = 'POST'182            req_data = {'Action': 'GetQueueUrl', 'Version': API_VERSION, 'QueueName': path.split('/')[-1]}183        if req_data:184            action = req_data.get('Action')185            if action in ('SendMessage', 'SendMessageBatch') and SQS_BACKEND_IMPL == 'moto':186                # check message contents187                for key, value in req_data.items():188                    if not re.match(MSG_CONTENT_REGEX, str(value)):189                        return make_requests_error(code=400, code_string='InvalidMessageContents',190                            message='Message contains invalid characters')191            elif action == 'SetQueueAttributes':192                # TODO remove this function if we stop using ElasticMQ entirely193                queue_url = _queue_url(path, req_data, headers)194                if SQS_BACKEND_IMPL == 'elasticmq':195                    forward_attrs = _set_queue_attributes(queue_url, req_data)196                    if len(req_data) != len(forward_attrs):197                        # make sure we only forward the supported attributes to the backend198                        return _get_attributes_forward_request(method, path, headers, req_data, forward_attrs)199            elif action == 'DeleteQueue':200                queue_url = _queue_url(path, req_data, headers)201                QUEUE_ATTRIBUTES.pop(queue_url, None)202                sns_listener.unsubscribe_sqs_queue(queue_url)203            elif action == 'ListDeadLetterSourceQueues':204                # TODO remove this function if we stop using ElasticMQ entirely205                queue_url = _queue_url(path, req_data, headers)206                if SQS_BACKEND_IMPL == 'elasticmq':207                    headers = {'content-type': 'application/xhtml+xml'}208                    content_str = _list_dead_letter_source_queues(QUEUE_ATTRIBUTES, queue_url)209                    return requests_response(content_str, headers=headers)210            if 'QueueName' in req_data:211                encoded_data = urlencode(req_data, doseq=True) if method == 'POST' else ''212                modified_url = None213                if method == 'GET':214                    base_path = path.partition('?')[0]215                    modified_url = '%s?%s' % (base_path, urlencode(req_data, doseq=True))216                return Request(data=encoded_data, url=modified_url, headers=headers, method=method)217        return True218    def return_response(self, method, path, data, headers, response, request_handler):219        # persist requests to disk220        super(ProxyListenerSQS, self).return_response(221            method, path, data, headers, response, request_handler222        )223        if method == 'OPTIONS' and path == '/':224            # Allow CORS preflight requests to succeed.225            return 200226        if method != 'POST':227            return228        region_name = aws_stack.get_region()229        req_data = parse_request_data(method, path, data)230        action = req_data.get('Action')231        content_str = content_str_original = to_str(response.content)232        if response.status_code >= 400:233            return response234        _fire_event(req_data, response)235        # patch the response and add missing attributes236        if action == 'GetQueueAttributes':237            content_str = _add_queue_attributes(path, req_data, content_str, headers)238        # patch the response and return the correct endpoint URLs / ARNs239        if action in ('CreateQueue', 'GetQueueUrl', 'ListQueues', 'GetQueueAttributes', 'ListDeadLetterSourceQueues'):240            if config.USE_SSL and '<QueueUrl>http://' in content_str:241                # return https://... if we're supposed to use SSL242                content_str = re.sub(r'<QueueUrl>\s*http://', r'<QueueUrl>https://', content_str)243            # expose external hostname:port244            external_port = SQS_PORT_EXTERNAL or get_external_port(headers, request_handler)245            content_str = re.sub(r'<QueueUrl>\s*([a-z]+)://[^<]*:([0-9]+)/([^<]*)\s*</QueueUrl>',246                                 r'<QueueUrl>\1://%s:%s/\3</QueueUrl>' % (HOSTNAME_EXTERNAL, external_port),247                                 content_str)248            # encode account ID in queue URL249            content_str = re.sub(r'<QueueUrl>\s*([a-z]+)://([^/]+)/queue/([^<]*)\s*</QueueUrl>',250                                 r'<QueueUrl>\1://\2/%s/\3</QueueUrl>' % constants.TEST_AWS_ACCOUNT_ID,251                                 content_str)252            # fix queue ARN253            content_str = re.sub(r'<([a-zA-Z0-9]+)>\s*arn:aws:sqs:elasticmq:([^<]+)</([a-zA-Z0-9]+)>',254                                 r'<\1>arn:aws:sqs:%s:\2</\3>' % region_name, content_str)255            if action == 'CreateQueue':256                queue_url = re.match(r'.*<QueueUrl>(.*)</QueueUrl>', content_str, re.DOTALL).group(1)257                if SQS_BACKEND_IMPL == 'elasticmq':258                    _set_queue_attributes(queue_url, req_data)259        elif action == 'SendMessageBatch':260            if validate_empty_message_batch(data, req_data):261                msg = 'There should be at least one SendMessageBatchRequestEntry in the request.'262                return make_requests_error(code=404, code_string='EmptyBatchRequest', message=msg)263        # instruct listeners to fetch new SQS message264        if action in ('SendMessage', 'SendMessageBatch'):265            _process_sent_message(path, req_data, headers)266        if content_str_original != content_str:267            # if changes have been made, return patched response268            response.headers['content-length'] = len(content_str)269            response.headers['x-amz-crc32'] = calculate_crc32(content_str)270            return requests_response(content_str, headers=response.headers, status_code=response.status_code)271    @classmethod272    # TODO still needed? (can probably be removed)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
