How to use get_queue_url method in localstack

Best Python code snippet using localstack_python

sqs.py

Source:sqs.py Github

copy

Full Screen

...6 client_lock.acquire()7 ret = boto3.client('sqs')8 client_lock.release()9 return ret10def get_queue_url(queue_name: str, sqs_client=None) -> (str, Any):11 if sqs_client is None:12 sqs_client = get_client()13 response = sqs_client.get_queue_url(QueueName=queue_name)14 return response['QueueUrl'], sqs_client15def remove_message(queue_name: str, receipt_handle: str, sqs_client=None):16 queue_url, sqs_client = get_queue_url(queue_name, sqs_client)17 sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)18def remove_messages(queue_name: str, receipt_handles: List[str], sqs_client=None):19 queue_url, sqs_client = get_queue_url(queue_name, sqs_client)20 chunks = [receipt_handles[i:i+10] for i in range(0, len(receipt_handles), 10)]21 for chunk in chunks:22 entries = [{'Id': str(i), 'ReceiptHandle': chunk[i]} for i in range(0, len(chunk))]23 sqs_client.delete_message_batch(QueueUrl=queue_url, Entries=entries)24def return_message(queue_name: str, receipt_handle: str, sqs_client=None):25 queue_url, sqs_client = get_queue_url(queue_name, sqs_client)26 sqs_client.change_message_visibility(QueueUrl=queue_url, ReceiptHandle=receipt_handle, VisibilityTimeout=0)27def return_messages(queue_name: str, receipt_handles: List[str], sqs_client=None):28 queue_url, sqs_client = get_queue_url(queue_name, sqs_client)29 # TODO implement batch version30 for receipt_handle in receipt_handles:31 sqs_client.return_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)32def send_messages(queue_name: str, messages: list, group_id: Optional[str] = None, sqs_client=None) -> list:33 """34 Returns: list os failed ids35 """36 queue_url, sqs_client = get_queue_url(queue_name, sqs_client)37 failed_ids = []38 messages_list = messages.copy()39 ids = list(range(len(messages_list)))40 while len(messages_list) > 0:41 # prepare a batch of up to 10 messages42 batch_items = []43 while len(messages_list) > 0 and len(batch_items) < 10:44 item = {'Id': str(ids.pop(0)), 'MessageBody': messages_list.pop(0)}45 if group_id is not None:46 item['MessageGroupId'] = group_id47 batch_items.append(item)48 result = sqs_client.send_message_batch(QueueUrl=queue_url, Entries=batch_items)49 if result.get('Failed'):50 failed_ids.extend([int(item['Id']) for item in result.get('Failed')])51 failed_messages = [messages[i] for i in failed_ids]52 return failed_messages53def receive_messages(queue_name, max_messages=10, wait_seconds=0, sqs_client=None) -> Dict[(str, str)]:54 """Returns a dict of {message_handle: body}"""55 queue_url, sqs_client = get_queue_url(queue_name, sqs_client)56 remaining_messages = max_messages57 result_messages = {}58 while remaining_messages > 0:59 request_messages = min(10, remaining_messages)60 result = sqs_client.receive_message(QueueUrl=queue_url,61 MaxNumberOfMessages=request_messages,62 WaitTimeSeconds=wait_seconds)63 remaining_messages -= request_messages64 if result.get('Messages'):65 new_results = {m['ReceiptHandle']: m['Body'] for m in result['Messages']}66 result_messages.update(new_results)67 # ku.log_trace(f'sqs: {len(new_results)} messages read from {queue_name}')68 else:69 # ku.log_trace(f'sqs: 0 messages read from {queue_name}')70 break71 return result_messages72def queue_attributes(queue_name, sqs_client=None) -> dict:73 queue_url, sqs_client = get_queue_url(queue_name, sqs_client)74 result = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['All'])75 return result['Attributes']76def purge_queue(queue_name, sqs_client=None):77 queue_url, sqs_client = get_queue_url(queue_name, sqs_client)...

Full Screen

Full Screen

sqs-test.py

Source:sqs-test.py Github

copy

Full Screen

...46 if delete:47 sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)48 print(f'Deleted message with handle {receipt_handle}')49 return f'MessageID: {message["MessageId"]} - Message: {message["Body"]}'50def get_queue_url(queue):51 response = sqs_client.get_queue_url(QueueName=queue)52 return response['QueueUrl']53if __name__ == '__main__':54 # get_queue_url('SteveGilissenTestQueue')55 current_datetime = datetime.now().isoformat()56 queue_name = get_queue_url('SteveGilissenTestQueue')57 print('Sending test message...')58 print('Result: ' + send_test_message(f'New test sent on {current_datetime}', queue_name))59 print('Retrieve message...')...

Full Screen

Full Screen

handler.py

Source:handler.py Github

copy

Full Screen

...11sqs = boto3.client('sqs')12queue_name = 'SampleAppQueue.fifo'13def get_queue_name():14 return queue_name15def get_queue_url(sqs):16 response = sqs.get_queue_url(QueueName=get_queue_name())17 return response['QueueUrl']18def lambda_handler(event, context):19 for record in event['Records']:20 # 長時間かかる場合もあるAPIを呼び出す処理に見立てる21 process_time = random.randint(40, 120)22 logger.info("%d秒かかるAPIコールが発生しています。", process_time)23 time.sleep(process_time)24 # メッセージを削除25 sqs.delete_message(QueueUrl=get_queue_url(sqs), ReceiptHandle=record["receiptHandle"])26 body = {27 "message": "Go Serverless v1.0! Your function executed successfully!",28 "input": event29 }30 response = {31 "statusCode": 200,32 "body": json.dumps(body)33 }...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful