How to use list_queue_tags method in localstack

Best Python code snippet using localstack_python

test_sqs.py

Source:test_sqs.py Github

copy

Full Screen

1"""2Tests for the sqs module.3"""4from unittest import TestCase, mock5from .test_widgets import expected_queue_list6from ..sqs import (SqsApiCalls, create_sqs_widgets)7class TestCreateSQSWidgets(TestCase):8 def setUp(self):9 self.deploy_stage = 'DEV'10 self.region = 'us-south-10'11 self.client_type = 'sqs'12 self.max_results = 1013 self.next_token = 'some huge string'14 self.valid_queue_url_1 = "https://us-west-2.queue.amazonaws.com/579777464052/aqts-capture-error-queue-DEV"15 self.valid_queue_url_2 = "https://us-west-2.queue.amazonaws.com/579777464052/aqts-capture-trigger-queue-DEV"16 self.valid_queue_url_3 = "https://us-west-2.queue.amazonaws.com/579777464052/a-neat-valid-queue-DEV"17 self.invalid_queue_url_1 = "https://us-west-2.queue.amazonaws.com/579777464052/some-queue-TEST"18 self.valid_queue_name_1 = "aqts-capture-error-queue-DEV"19 self.valid_queue_name_2 = "aqts-capture-trigger-queue-DEV"20 self.valid_queue_name_3 = "a-neat-valid-queue-DEV"21 self.queue_list_no_next_token = {22 "QueueUrls": [23 self.valid_queue_url_124 ]25 }26 self.queue_list_with_next_token_1 = {27 "QueueUrls": [28 self.valid_queue_url_2,29 self.invalid_queue_url_130 ],31 "NextToken": self.next_token32 }33 self.queue_list_after_successful_pagination = {34 "QueueUrls": [35 self.valid_queue_url_2,36 self.invalid_queue_url_1,37 self.valid_queue_url_138 ],39 "NextToken": self.next_token40 }41 self.full_queue_list = {42 "QueueUrls": [43 self.valid_queue_url_2,44 self.invalid_queue_url_1,45 self.valid_queue_url_1,46 self.valid_queue_url_347 ],48 "NextToken": self.next_token49 }50 self.tags_list_for_valid_queue_url_1 = {51 'Tags': {52 'wma:organization': 'IOW'53 }54 }55 self.tags_list_empty_tags = {}56 self.tags_list_no_tags = {57 'Tags': {}58 }59 self.tags_list_no_wma_org_key = {60 'Tags': {61 'someKey': 'IOW'62 }63 }64 self.tags_list_no_iow_value = {65 'Tags': {66 'wma:organization': 'not IOW tag'67 }68 }69 @mock.patch('cloudwatch_monitoring.sqs.boto3.client', autospec=True)70 def test_get_all_sqs_queue_urls(self, m_client):71 mock_sqs_client = mock.Mock()72 m_client.return_value = mock_sqs_client73 api_calls = SqsApiCalls(self.region, self.deploy_stage)74 # only one queue returned from list_queues75 mock_sqs_client.list_queues.return_value = self.queue_list_no_next_token76 # noinspection PyPackageRequirements77 self.assertDictEqual(78 api_calls.get_all_sqs_queue_urls(),79 self.queue_list_no_next_token80 )81 # assert the boto3 sqs client was called with expected params82 m_client.assert_called_with(self.client_type, region_name=self.region)83 # assert the sqs client called list_queues with expected arguments84 mock_sqs_client.list_queues.assert_called_with(MaxResults=self.max_results)85 @mock.patch('cloudwatch_monitoring.sqs.boto3.client', autospec=True)86 def test_get_all_sqs_queue_urls_next_token_pagination(self, m_client):87 mock_sqs_client = mock.Mock()88 m_client.return_value = mock_sqs_client89 api_calls = SqsApiCalls(self.region, self.deploy_stage)90 # 2 queues returned, the first queue has the key NextToken that causes us to begin iterating through pages91 # of list_queues responses92 mock_sqs_client.list_queues.side_effect = [93 self.queue_list_with_next_token_1,94 self.queue_list_no_next_token95 ]96 expected = [97 mock.call(MaxResults=self.max_results),98 mock.call(MaxResults=self.max_results, NextToken=self.next_token)99 ]100 # Assert we get the expected queue list after 1 pagination iteration101 # noinspection PyPackageRequirements102 self.assertDictEqual(103 api_calls.get_all_sqs_queue_urls(),104 self.queue_list_after_successful_pagination105 )106 # assert the boto3 sqs client was called with expected params107 m_client.assert_called_with(self.client_type, region_name=self.region)108 # assert the list_queues calls were called, and in the expected order109 mock_sqs_client.list_queues.assert_has_calls(expected, any_order=False)110 @mock.patch('cloudwatch_monitoring.sqs.boto3.client', autospec=True)111 def test_is_iow_queue_filter_happy_path(self, m_client):112 mock_sqs_client = mock.Mock()113 m_client.return_value = mock_sqs_client114 mock_sqs_client.list_queue_tags.return_value = self.tags_list_for_valid_queue_url_1115 api_calls = SqsApiCalls(self.region, self.deploy_stage)116 # assert the return value is true, since list_queue_tags returned a valid response117 # noinspection PyPackageRequirements118 self.assertTrue(119 api_calls.is_iow_queue_filter(self.valid_queue_url_1)120 )121 # assert the boto3 sqs client was called with expected params122 m_client.assert_called_with(self.client_type, region_name=self.region)123 # assert the sqs client called list_queue_tags with expected arguments124 mock_sqs_client.list_queue_tags.assert_called_with(QueueUrl=self.valid_queue_url_1)125 @mock.patch('cloudwatch_monitoring.sqs.boto3.client', autospec=True)126 def test_is_iow_queue_filter_empty_tags_response(self, m_client):127 mock_sqs_client = mock.Mock()128 m_client.return_value = mock_sqs_client129 mock_sqs_client.list_queue_tags.return_value = self.tags_list_empty_tags130 api_calls = SqsApiCalls(self.region, self.deploy_stage)131 # assert the return value is False, since list_queue_tags returned a response with no 'Tags' key132 # noinspection PyPackageRequirements133 self.assertFalse(134 api_calls.is_iow_queue_filter(self.valid_queue_url_1)135 )136 @mock.patch('cloudwatch_monitoring.sqs.boto3.client', autospec=True)137 def test_is_iow_queue_filter_no_tags_response(self, m_client):138 mock_sqs_client = mock.Mock()139 m_client.return_value = mock_sqs_client140 mock_sqs_client.list_queue_tags.return_value = self.tags_list_no_tags141 api_calls = SqsApiCalls(self.region, self.deploy_stage)142 # assert the return value is False, since list_queue_tags returned a response with no tags in the Tags dict143 # noinspection PyPackageRequirements144 self.assertFalse(145 api_calls.is_iow_queue_filter(self.valid_queue_url_1)146 )147 @mock.patch('cloudwatch_monitoring.sqs.boto3.client', autospec=True)148 def test_is_iow_queue_filter_no_wma_org_key_response(self, m_client):149 mock_sqs_client = mock.Mock()150 m_client.return_value = mock_sqs_client151 mock_sqs_client.list_queue_tags.return_value = self.tags_list_no_wma_org_key152 api_calls = SqsApiCalls(self.region, self.deploy_stage)153 # assert the return value is False, since list_queue_tags returned a response with no wma:organization key in154 # the Tags dict155 # noinspection PyPackageRequirements156 self.assertFalse(157 api_calls.is_iow_queue_filter(self.valid_queue_url_1)158 )159 @mock.patch('cloudwatch_monitoring.sqs.boto3.client', autospec=True)160 def test_is_iow_queue_filter_no_iow_value_response(self, m_client):161 mock_sqs_client = mock.Mock()162 m_client.return_value = mock_sqs_client163 mock_sqs_client.list_queue_tags.return_value = self.tags_list_no_iow_value164 api_calls = SqsApiCalls(self.region, self.deploy_stage)165 # assert the return value is False, since list_queue_tags returned a response with a valid wma:organization166 # key, but the value is not "IOW"167 # noinspection PyPackageRequirements168 self.assertFalse(169 api_calls.is_iow_queue_filter(self.valid_queue_url_1)170 )171 @mock.patch('cloudwatch_monitoring.sqs.SqsApiCalls', autospec=True)172 def test_create_sqs_widgets(self, m_api_calls):173 # return values174 m_api_calls.return_value.get_all_sqs_queue_urls.return_value = self.full_queue_list175 m_api_calls.return_value.is_iow_queue_filter.side_effect = [176 True, False, True, True177 ]178 # expected calls179 expected_is_iow_queue_filter_calls = [180 mock.call(self.valid_queue_url_2),181 mock.call(self.invalid_queue_url_1),182 mock.call(self.valid_queue_url_1),183 mock.call(self.valid_queue_url_3)184 ]185 # Make sure the resultant widget list is correct186 # noinspection PyPackageRequirements187 self.assertListEqual(188 create_sqs_widgets(self.region, self.deploy_stage),189 expected_queue_list190 )191 # assert our helper methods were called the expected number of times and in the proper order192 m_api_calls.return_value.get_all_sqs_queue_urls.assert_called_once()...

Full Screen

Full Screen

create_sqs.py

Source:create_sqs.py Github

copy

Full Screen

...28 print("Queue URL is " + self.queue.url)29 queue_name = self.queue.url.split('/')[4]30 print("Queue name is " + queue_name)31 # Check if Tags exists, else32 if u'Tags' in client.list_queue_tags(QueueUrl=self.queue.url):33 tags_list = client.list_queue_tags(QueueUrl=self.queue.url)[u'Tags']34 else:35 print("Tags doesn't exists, skipping or exit")36 tags_list = {}37 if 'Owner' in tags_list:38 topic_name = client.list_queue_tags(QueueUrl=self.queue.url)[u'Tags']['Owner']39 print("Value for tag Owner is " + topic_name)40 else:41 print("Tag Owner doesn't exists, picking up default Owner")42 topic_name = default_owner43 if 'Email' in tags_list:44 subscriptions = client.list_queue_tags(QueueUrl=self.queue.url)[u'Tags']['Email']45 print("Value for tag Email is " + subscriptions)46 else:47 print("Tag Email doesn't exists, picking up default subscriptions")48 subscriptions = default_subscription49 50 # Create Topic and Subscriptions51 create_TnS = create_topic_and_subscription.create_topic_and_subscription(topic_name,subscriptions)52 topic_arn = create_TnS.create_topic_and_sub()53 print("Topic arn is " + topic_arn)54 55 # Create Alarms for each metrics56 for metric in metrics:57 if metric in tags_list and metric in client.list_queue_tags(QueueUrl=self.queue.url)[u'Tags']:58 thresholds[metric]["threshold"] = client.list_queue_tags(QueueUrl=self.queue.url)[u'Tags'][metric]59 else:60 print(metric + " not defined in tags, so picking up default value")61 62 # If perecent is defined in threshold, threshold will be % of source63 # Like threshold is "75 percent" for ApproximateAgeOfOldestMessage, threshold set will be 75% of MessageRetentionPeriod64 if "percent" in thresholds[metric]["threshold"]:65 source = self.queue.attributes[thresholds[metric]["source"]]66 print(thresholds[metric]["source"] + " set for queue " + queue_name + " is " + source)67 percent = thresholds[metric]["threshold"].replace('percent','').replace(' ','')68 print("Threshold %age set for queue " + queue_name + " will be " + percent + "% of " + thresholds[metric]["source"])69 thresholds[metric]["threshold"] = int(source)*int(percent)//10070 print("Setting threshold value to " + str(thresholds[metric]["threshold"]) + thresholds[metric]["unit"])71 create_alarm = create_alarms.create_alarms("SQS","QueueName",queue_name,metric,thresholds[metric]["operator"],thresholds[metric]["threshold"],thresholds[metric]["unit"],topic_arn)72 create_alarm.create_cloudwatch_alarms()

Full Screen

Full Screen

list_sqs_tags.py

Source:list_sqs_tags.py Github

copy

Full Screen

...53 client = get_sts_client(account, region)54 try:55 if 'QueueUrls' in client.list_queues():56 for url in client.list_queues()['QueueUrls']:57 if 'Tags' in client.list_queue_tags(QueueUrl=url):58 print(59 f"The queue {url.rsplit('/', 1)[1]} have tags {client.list_queue_tags(QueueUrl=url)['Tags']}")60 writer.writerow(61 [S_No, account, url.rsplit('/', 1)[1], client.list_queue_tags(QueueUrl=url)['Tags']])62 S_No += 163 else:64 print(65 f" The queue {url.rsplit('/', 1)[1]} have no tags")66 writer.writerow(67 [S_No, account, url.rsplit('/', 1)[1]])68 S_No += 169 except ClientError as e:70 print("queue doesn't exist error" + e)71def main():72 return sqs_queue_tags()73if __name__ == "__main__":...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful