How to use _add_queue_attributes method in localstack

Best Python code snippet using localstack_python

sqs_listener.py

Source:sqs_listener.py Github

copy

Full Screen

...83 QUEUE_ATTRIBUTES[queue_url] = QUEUE_ATTRIBUTES.get(queue_url) or {}84 QUEUE_ATTRIBUTES[queue_url].update(local_attrs)85 forward_attrs = dict([(k, v) for k, v in attrs.items() if k not in UNSUPPORTED_ATTRIBUTE_NAMES])86 return forward_attrs87def _add_queue_attributes(path, req_data, content_str, headers):88 # TODO remove this function if we stop using ElasticMQ entirely89 if SQS_BACKEND_IMPL != 'elasticmq':90 return content_str91 flags = re.MULTILINE | re.DOTALL92 queue_url = _queue_url(path, req_data, headers)93 requested_attributes = _format_attributes_names(req_data)94 regex = r'(.*<GetQueueAttributesResult>)(.*)(</GetQueueAttributesResult>.*)'95 attrs = re.sub(regex, r'\2', content_str, flags=flags)96 for key, value in QUEUE_ATTRIBUTES.get(queue_url, {}).items():97 if (not requested_attributes or requested_attributes.intersection({'All', key})) and \98 not re.match(r'<Name>\s*%s\s*</Name>' % key, attrs, flags=flags):99 attrs += '<Attribute><Name>%s</Name><Value>%s</Value></Attribute>' % (key, value)100 content_str = (re.sub(regex, r'\1', content_str, flags=flags) +101 attrs + re.sub(regex, r'\3', content_str, flags=flags))102 return content_str103def _fire_event(req_data, response):104 action = req_data.get('Action')105 event_type = None106 queue_url = None107 if action == 'CreateQueue':108 event_type = event_publisher.EVENT_SQS_CREATE_QUEUE109 response_data = xmltodict.parse(response.content)110 if 'CreateQueueResponse' in response_data:111 queue_url = response_data['CreateQueueResponse']['CreateQueueResult']['QueueUrl']112 elif action == 'DeleteQueue':113 event_type = event_publisher.EVENT_SQS_DELETE_QUEUE114 queue_url = req_data.get('QueueUrl')115 if event_type and queue_url:116 event_publisher.fire_event(event_type, payload={'u': event_publisher.get_hash(queue_url)})117def _queue_url(path, req_data, headers):118 queue_url = req_data.get('QueueUrl')119 if queue_url:120 return queue_url121 url = config.TEST_SQS_URL122 if headers.get('Host'):123 url = '%s://%s' % (get_service_protocol(), headers['Host'])124 queue_url = '%s%s' % (url, path.partition('?')[0])125 return queue_url126def _list_dead_letter_source_queues(queues, queue_url):127 dead_letter_source_queues = []128 for k, v in queues.items():129 for i, j in v.items():130 if i == 'RedrivePolicy':131 f = json.loads(v[i])132 queue_url_split = queue_url.split('/')133 if queue_url_split[-1] in f['deadLetterTargetArn']:134 dead_letter_source_queues.append(k)135 return format_list_dl_source_queues_response(dead_letter_source_queues)136def _process_sent_message(path, req_data, headers):137 queue_name = _queue_url(path, req_data, headers).rpartition('/')[2]138 lambda_api.process_sqs_message(queue_name)139def format_list_dl_source_queues_response(queues):140 content_str = """<ListDeadLetterSourceQueuesResponse xmlns="{}">141 <ListDeadLetterSourceQueuesResult>142 {}143 </ListDeadLetterSourceQueuesResult>144 </ListDeadLetterSourceQueuesResponse>"""145 queue_urls = ''146 for q in queues:147 queue_urls += '<QueueUrl>{}</QueueUrl>'.format(q)148 return content_str.format(XMLNS_SQS, queue_urls)149# extract the external port used by the client to make the request150def get_external_port(headers, request_handler):151 host = headers.get('Host', '')152 if not host:153 forwarded = headers.get('X-Forwarded-For', '').split(',')154 host = forwarded[-2] if len(forwarded) > 2 else forwarded[-1]155 if ':' in host:156 return int(host.split(':')[1])157 if not request_handler or not request_handler.proxy:158 return config.PORT_SQS159 # If we cannot find the Host header, then fall back to the port of the proxy.160 # (note that this could be incorrect, e.g., if running in Docker with a host port that161 # is different from the internal container port, but there is not much else we can do.)162 return request_handler.proxy.port163def validate_empty_message_batch(data, req_data):164 data = to_str(data).split('Entries=')165 if len(data) > 1 and not req_data.get('Entries'):166 return True167 return False168def is_sqs_queue_url(url):169 path = path_from_url(url).partition('?')[0]170 return re.match(r'^/(queue|%s)/[a-zA-Z0-9_-]+$' % constants.TEST_AWS_ACCOUNT_ID, path)171class ProxyListenerSQS(PersistingProxyListener):172 def api_name(self):173 return 'sqs'174 def forward_request(self, method, path, data, headers):175 if method == 'OPTIONS':176 return 200177 req_data = parse_request_data(method, path, data)178 if is_sqs_queue_url(path) and method == 'GET':179 if not headers.get('Authorization'):180 headers['Authorization'] = aws_stack.mock_aws_request_headers(service='sqs')['Authorization']181 method = 'POST'182 req_data = {'Action': 'GetQueueUrl', 'Version': API_VERSION, 'QueueName': path.split('/')[-1]}183 if req_data:184 action = req_data.get('Action')185 if action in ('SendMessage', 'SendMessageBatch') and SQS_BACKEND_IMPL == 'moto':186 # check message contents187 for key, value in req_data.items():188 if not re.match(MSG_CONTENT_REGEX, str(value)):189 return make_requests_error(code=400, code_string='InvalidMessageContents',190 message='Message contains invalid characters')191 elif action == 'SetQueueAttributes':192 # TODO remove this function if we stop using ElasticMQ entirely193 queue_url = _queue_url(path, req_data, headers)194 if SQS_BACKEND_IMPL == 'elasticmq':195 forward_attrs = _set_queue_attributes(queue_url, req_data)196 if len(req_data) != len(forward_attrs):197 # make sure we only forward the supported attributes to the backend198 return _get_attributes_forward_request(method, path, headers, req_data, forward_attrs)199 elif action == 'DeleteQueue':200 queue_url = _queue_url(path, req_data, headers)201 QUEUE_ATTRIBUTES.pop(queue_url, None)202 sns_listener.unsubscribe_sqs_queue(queue_url)203 elif action == 'ListDeadLetterSourceQueues':204 # TODO remove this function if we stop using ElasticMQ entirely205 queue_url = _queue_url(path, req_data, headers)206 if SQS_BACKEND_IMPL == 'elasticmq':207 headers = {'content-type': 'application/xhtml+xml'}208 content_str = _list_dead_letter_source_queues(QUEUE_ATTRIBUTES, queue_url)209 return requests_response(content_str, headers=headers)210 if 'QueueName' in req_data:211 encoded_data = urlencode(req_data, doseq=True) if method == 'POST' else ''212 modified_url = None213 if method == 'GET':214 base_path = path.partition('?')[0]215 modified_url = '%s?%s' % (base_path, urlencode(req_data, doseq=True))216 return Request(data=encoded_data, url=modified_url, headers=headers, method=method)217 return True218 def return_response(self, method, path, data, headers, response, request_handler):219 # persist requests to disk220 super(ProxyListenerSQS, self).return_response(221 method, path, data, headers, response, request_handler222 )223 if method == 'OPTIONS' and path == '/':224 # Allow CORS preflight requests to succeed.225 return 200226 if method != 'POST':227 return228 region_name = aws_stack.get_region()229 req_data = parse_request_data(method, path, data)230 action = req_data.get('Action')231 content_str = content_str_original = to_str(response.content)232 if response.status_code >= 400:233 return response234 _fire_event(req_data, response)235 # patch the response and add missing attributes236 if action == 'GetQueueAttributes':237 content_str = _add_queue_attributes(path, req_data, content_str, headers)238 # patch the response and return the correct endpoint URLs / ARNs239 if action in ('CreateQueue', 'GetQueueUrl', 'ListQueues', 'GetQueueAttributes', 'ListDeadLetterSourceQueues'):240 if config.USE_SSL and '<QueueUrl>http://' in content_str:241 # return https://... if we're supposed to use SSL242 content_str = re.sub(r'<QueueUrl>\s*http://', r'<QueueUrl>https://', content_str)243 # expose external hostname:port244 external_port = SQS_PORT_EXTERNAL or get_external_port(headers, request_handler)245 content_str = re.sub(r'<QueueUrl>\s*([a-z]+)://[^<]*:([0-9]+)/([^<]*)\s*</QueueUrl>',246 r'<QueueUrl>\1://%s:%s/\3</QueueUrl>' % (HOSTNAME_EXTERNAL, external_port),247 content_str)248 # encode account ID in queue URL249 content_str = re.sub(r'<QueueUrl>\s*([a-z]+)://([^/]+)/queue/([^<]*)\s*</QueueUrl>',250 r'<QueueUrl>\1://\2/%s/\3</QueueUrl>' % constants.TEST_AWS_ACCOUNT_ID,251 content_str)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful