How to use sqs_queue method in localstack

Best Python code snippet using localstack_python

test_sqs_queue.py

Source:test_sqs_queue.py Github

copy

Full Screen

1from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext2from tests.knowledge.context.aws_context_test import AwsContextTest3from tests.knowledge.context.test_context_annotation import TestOptions, context4class TestSqsQueue(AwsContextTest):5 def get_component(self):6 return 'sqs_queues'7 @context(module_path="encrypted_at_rest")8 def test_encrypted_at_rest(self, ctx: AwsEnvironmentContext):9 self.assertEqual(len(ctx.sqs_queues), 2)10 sqs_queue = next((sqs_queue for sqs_queue in ctx.sqs_queues if sqs_queue.queue_name == 'sqs_encrypted'), None)11 self.assertTrue(sqs_queue.encrypted_at_rest)12 self.assertTrue(sqs_queue.arn)13 self.assertFalse(sqs_queue.resource_based_policy)14 if not sqs_queue.is_managed_by_iac:15 self.assertEqual(sqs_queue.get_cloud_resource_url(),16 'https://console.aws.amazon.com/sqs/v2/home?region=us-east-1#'17 '/queues/https%3A%2F%2Fqueue.amazonaws.com%2F115553109071%2Fsqs_encrypted')18 @context(module_path="encrypted_at_rest_with_customer_managed_cmk")19 def test_encrypted_at_rest_with_customer_managed_cmk(self, ctx: AwsEnvironmentContext):20 self.assertEqual(len(ctx.sqs_queues), 1)21 self.assertTrue(ctx.sqs_queues[0].encrypted_at_rest)22 self.assertTrue(ctx.sqs_queues[0].arn)23 self.assertFalse(ctx.sqs_queues[0].resource_based_policy)24 self.assertEqual(ctx.sqs_queues[0].queue_name, 'sqs_encrypted')25 self.assertEqual(ctx.sqs_queues[0].kms_key, 'arn:aws:kms:us-east-1:115553109071:key/ed8cfa36-e3fe-4981-a565-ed4ecd238d50')26 @context(module_path="no_encryption")27 def test_no_encryption(self, ctx: AwsEnvironmentContext):28 self.assertEqual(len(ctx.sqs_queues), 1)29 sqs_queue = ctx.sqs_queues[0]30 self.assertFalse(sqs_queue.encrypted_at_rest)31 self.assertFalse(sqs_queue.resource_based_policy)32 self.assertFalse(sqs_queue.tags)33 @context(module_path="bad_policy")34 def test_bad_policy(self, ctx: AwsEnvironmentContext):35 sqs_queue = ctx.sqs_queues[0]36 self.assertEqual(sqs_queue.resource_based_policy.statements[0].actions[0], 'sqs:*')37 self.assertTrue(sqs_queue.resource_based_policy.queue_name)38 self.assertFalse(sqs_queue.encrypted_at_rest)39 self.assertTrue(sqs_queue.arn)40 self.assertEqual(sqs_queue.queue_name, 'cloudrail-not-secure-queue')41 @context(module_path="good_policy")42 def test_good_policy(self, ctx: AwsEnvironmentContext):43 sqs_queue = ctx.sqs_queues[0]44 self.assertEqual(sqs_queue.resource_based_policy.statements[0].actions[0], 'sqs:SendMessage')45 self.assertTrue(sqs_queue.resource_based_policy.queue_name)46 self.assertFalse(sqs_queue.encrypted_at_rest)47 self.assertTrue(sqs_queue.arn)48 self.assertEqual(sqs_queue.queue_name, 'cloudrail-secure-queue')49 @context(module_path="secure_policy_existing_queue", base_scanner_data_for_iac='account-data-existing-sqs-queue-secure-policy.zip',50 test_options=TestOptions(run_cloudmapper=False))51 def test_secure_policy_existing_queue(self, ctx: AwsEnvironmentContext):52 sqs_queue = ctx.sqs_queues[0]53 self.assertEqual(sqs_queue.resource_based_policy.statements[0].actions[0], 'sqs:SendMessage')54 self.assertTrue(sqs_queue.resource_based_policy.queue_name)55 self.assertFalse(sqs_queue.encrypted_at_rest)56 self.assertTrue(sqs_queue.arn)57 self.assertEqual(sqs_queue.queue_name, 'cloudrail-secure-queue_2')58 self.assertEqual(sqs_queue.account, '115553109071')59 self.assertEqual(sqs_queue.region, 'us-east-1')60 @context(module_path="bad_policy_policy_inside_main_resource")61 def test_bad_policy_policy_inside_main_resource(self, ctx: AwsEnvironmentContext):62 sqs_queue = ctx.sqs_queues[0]63 self.assertEqual(sqs_queue.resource_based_policy.statements[0].actions[0], 'sqs:*')64 self.assertTrue(sqs_queue.resource_based_policy.queue_name)65 self.assertFalse(sqs_queue.encrypted_at_rest)66 self.assertTrue(sqs_queue.arn)67 self.assertEqual(sqs_queue.queue_name, 'cloudrail-not-secure-queue')68 @context(module_path="with_tags")69 def test_with_tags(self, ctx: AwsEnvironmentContext):70 sqs = next((sqs for sqs in ctx.sqs_queues if sqs.queue_name == 'sqs_non_encrypted'), None)71 self.assertIsNotNone(sqs)...

Full Screen

Full Screen

test_sqs_client.py

Source:test_sqs_client.py Github

copy

Full Screen

1import os2import pdb3from horey.aws_api.aws_clients.sqs_client import SQSClient4from horey.aws_api.aws_services_entities.sqs_queue import SQSQueue5from horey.h_logger import get_logger6from horey.aws_api.base_entities.aws_account import AWSAccount7from horey.common_utils.common_utils import CommonUtils8configuration_values_file_full_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "h_logger_configuration_values.py")9logger = get_logger(configuration_values_file_full_path=configuration_values_file_full_path)10accounts_file_full_path = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "ignore", "aws_api_managed_accounts.py"))11accounts = CommonUtils.load_object_from_module(accounts_file_full_path, "main")12AWSAccount.set_aws_account(accounts["1111"])13AWSAccount.set_aws_region(accounts["1111"].regions['us-west-2'])14mock_values_file_path = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "ignore", "mock_values.py"))15mock_values = CommonUtils.load_object_from_module(mock_values_file_path, "main")16def test_init_client():17 assert isinstance(SQSClient(), SQSClient)18def test_provision_queue():19 client = SQSClient()20 sqs_queue = SQSQueue({})21 sqs_queue.region = AWSAccount.get_aws_region()22 sqs_queue.name = "sqs_queue_horey_test"23 sqs_queue.visibility_timeout = "30"24 sqs_queue.maximum_message_size = "262144"25 sqs_queue.message_retention_period = "604800"26 sqs_queue.delay_seconds = "0"27 sqs_queue.tags = {"name": sqs_queue.name}28 client.provision_queue(sqs_queue)29 assert sqs_queue.queue_url is not None30def test_provision_queue_dlq():31 client = SQSClient()32 sqs_queue = SQSQueue({})33 sqs_queue.region = AWSAccount.get_aws_region()34 sqs_queue.name = "sqs_queue_horey_test_dlq"35 sqs_queue.visibility_timeout = "30"36 sqs_queue.maximum_message_size = "262144"37 sqs_queue.message_retention_period = "604800"38 sqs_queue.delay_seconds = "0"39 sqs_queue.tags = {"name": sqs_queue.name}40 client.provision_queue(sqs_queue)41 assert sqs_queue.queue_url is not None42def test_provision_queue_update():43 client = SQSClient()44 sqs_queue = SQSQueue({})45 sqs_queue.region = AWSAccount.get_aws_region()46 sqs_queue.name = "sqs_queue_horey_test"47 sqs_queue.visibility_timeout = "30"48 sqs_queue.maximum_message_size = "262144"49 sqs_queue.message_retention_period = "604800"50 sqs_queue.delay_seconds = "0"51 sqs_queue.redrive_policy = "{\"deadLetterTargetArn\":\"" + mock_values["dlq_arn"] + "\",\"maxReceiveCount\":1000}"52 sqs_queue.tags = {"name": sqs_queue.name}53 client.provision_queue(sqs_queue)54 assert sqs_queue.queue_url is not None55if __name__ == "__main__":56 test_init_client()57 test_provision_queue_dlq()58 test_provision_queue()...

Full Screen

Full Screen

resource.py

Source:resource.py Github

copy

Full Screen

...12 self.url = sqs_queue.url13 for method in getmembers(sqs_queue, ismethod):14 if not method[0].startswith('_') and method[0] not in ['receive_messages', 'send_message', 'send_messages']:15 setattr(self, method[0], method[1])16 def __get_sqs_queue(self):17 return self.__sqs_queue18 sqs_queue = property(__get_sqs_queue)19 def receive_jms_messages(self, **kwargs):20 kwargs['MessageAttributeNames'] = _add_required_message_attribute_names(kwargs.get('MessageAttributeNames') or [])21 return [ _create_jms_message(sqs_message) for sqs_message in self.sqs_queue.receive_messages(**kwargs) ]22 def send_bytes_message(self, JMSReplyTo=None, JMSCorrelationId=None, **kwargs):23 return self.sqs_queue.send_message(24 **_encode_jms_message(25 JMSMessageType=JMSMessageType.BYTE, 26 JMSReplyTo=JMSReplyTo, 27 JMSCorrelationId=JMSCorrelationId, 28 **kwargs)29 )30 def send_jms_messages(self, **kwargs):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful