How to use sqs_queue_name method in localstack

Best Python code snippet using localstack_python

test_utils.py

Source:test_utils.py Github

copy

Full Screen

1import os2import unittest3import random4import string5from django.conf import settings6from challenges.utils import get_file_content7from base.utils import get_queue_name8class BaseTestCase(unittest.TestCase):9 def setUp(self):10 self.test_file_path = os.path.join(11 settings.BASE_DIR, "examples", "example1", "test_annotation.txt"12 )13 self.sqs_valid_characters = (14 string.ascii_lowercase15 + string.ascii_uppercase16 + string.digits17 + "-"18 + "_"19 )20 def test_get_file_content(self):21 test_file_content = get_file_content(self.test_file_path, "rb")22 expected = "1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n"23 self.assertEqual(test_file_content.decode(), expected)24 def test_sqs_queue_name_generator_long_title(self):25 title = "".join(26 [random.choice(self.sqs_valid_characters) for i in range(256)]27 )28 challenge_pk = 129 sqs_queue_name = get_queue_name(title, challenge_pk)30 self.assertNotRegex(sqs_queue_name, "[^a-zA-Z0-9_-]")31 self.assertLessEqual(len(sqs_queue_name), 80)32 def test_sqs_queue_name_generator_title_has_special_char(self):33 title = "".join([random.choice(string.printable) for i in range(80)])34 challenge_pk = 135 sqs_queue_name = get_queue_name(title, challenge_pk)36 self.assertNotRegex(sqs_queue_name, "[^a-zA-Z0-9_-]")37 self.assertLessEqual(len(sqs_queue_name), 80)38 def test_sqs_queue_name_generator_title_has_special_char_and_long_title(39 self,40 ):41 title = "".join([random.choice(string.printable) for i in range(256)])42 challenge_pk = 143 sqs_queue_name = get_queue_name(title, challenge_pk)44 self.assertNotRegex(sqs_queue_name, "[^a-zA-Z0-9_-]")45 self.assertLessEqual(len(sqs_queue_name), 80)46 def test_sqs_queue_name_generator_empty_title(self):47 title = ""48 challenge_pk = 149 sqs_queue_name = get_queue_name(title, challenge_pk)50 self.assertNotRegex(sqs_queue_name, "[^a-zA-Z0-9_-]")...

Full Screen

Full Screen

queues.py

Source:queues.py Github

copy

Full Screen

1import boto32import botocore3import json4import shutil5import glob26import os7import time8from pywren import wrenhandler, wrenutil, local9SOURCE_DIR = os.path.dirname(os.path.abspath(__file__)) 10class SQSInvoker(object):11 def __init__(self, region_name, sqs_queue_name):12 self.region_name = region_name13 self.sqs_queue_name = sqs_queue_name14 self.sqs = boto3.resource('sqs', region_name=region_name)15 self.queue = self.sqs.get_queue_by_name(QueueName=sqs_queue_name)16 17 self.TIME_LIMIT = False18 def invoke(self, payload):19 """20 Invoke -- return information about this invocation21 """22 MessageBody = json.dumps(payload)23 response = self.queue.send_message(MessageBody=MessageBody)24 # fixme return something25 def config(self):26 """27 Return config dict28 """29 return {'sqs_queue_name_name' : self.sqs_queue_name, 30 'region_name' : self.region_name}31def sqs_run_local(region_name, sqs_queue_name, job_num=1, 32 run_dir="/tmp/tasks"):33 """34 Simple code to run jobs from SQS locally35 USE ONLY FOR DEBUG 36 """37 sqs = boto3.resource('sqs', region_name=region_name)38 39 queue = sqs.get_queue_by_name(QueueName=sqs_queue_name)40 for job_i in range(job_num):41 42 while True:43 response = queue.receive_messages(WaitTimeSeconds=10, 44 MaxNumberOfMessages=1)45 if len(response) > 0:46 print("Dispatching")47 #pool.apply_async(48 m = response[0]49 job = json.loads(m.body)50 51 m.delete()52 local.local_handler([job], run_dir, 53 {'invoker' : 'SQSInvoker', 54 'job_i' : job_i})55 print("done with invocation")56 break57 else:58 print("no message, sleeping")59 time.sleep(4)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful