How to use _queue_url method in localstack

Best Python code snippet using localstack_python

runner.py

Source:runner.py Github

copy

Full Screen

1#==============================================================================2# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15#==============================================================================16from threading import Thread17from Queue import Queue18import logging19from .vendored.botocore import session as bc_session20from .resources import Message, ResourceEvent, CustomResource21log = logging.getLogger("cfn.resourcebridge")22class CfnBridge(object):23 def __init__(self, custom_resources, num_threads=None):24 # Lookup of queue to resources25 self._resource_lookup = {}26 # Construct an unbounded Queue to hold our pending tasks27 self._task_queue = Queue()28 # List of queues already being polled.29 queues = set()30 # Build our resource lookup and queue list31 for new_res in custom_resources:32 # Generate a lookup key for the custom_resource33 lookup = LookupKey((new_res.queue_url, new_res.service_token, new_res.resource_type))34 # Check if there is an existing resource that already maps exactly to this resource.35 existing_res = self._resource_lookup.get(lookup)36 if existing_res:37 raise ValueError(u"[%s] section in '%s' handles the same events as [%s] in '%s'" %38 (new_res.name, new_res.source_file, existing_res.name, existing_res.source_file))39 # Add our resource into the lookup40 self._resource_lookup[lookup] = new_res41 # Determine if this queue has been setup for polling42 if new_res.queue_url not in queues:43 queues.add(new_res.queue_url)44 # Construct a task to poll the new queue45 self._task_queue.put(QueuePollTask(new_res.queue_url, new_res.region, self._resource_lookup))46 # Determine the maximum number of threads to use47 count = len(custom_resources)48 self._num_threads = num_threads if num_threads else count + min(count * 3, 10)49 # Display a warning if polling/processing threads will be shared (meaning we can't poll & work)50 if self._num_threads <= count:51 log.warn(u"You have %s custom resource(s) and %s thread(s). There may be degraded performance "52 u"as polling threads will have to be shared with processing threads.", count, self._num_threads)53 def process_messages(self):54 for i in range(self._num_threads):55 worker = Thread(target=self.task_worker)56 worker.daemon = True57 worker.start()58 def task_worker(self):59 while True:60 task = self._task_queue.get()61 try:62 new_tasks = task.execute_task()63 if new_tasks:64 for t in new_tasks:65 self._task_queue.put(t)66 except:67 log.exception(u"Failed executing task")68 finally:69 self._task_queue.task_done()70 # Reschedule the polling tasks71 if isinstance(task, QueuePollTask):72 self._task_queue.put(task)73class LookupKey(object):74 def __init__(self, properties_tuple):75 self._properties = properties_tuple76 @property77 def properties(self):78 return self._properties79 def __eq__(self, other):80 return self.properties == other.properties81 def __hash__(self):82 return hash(self._properties)83 def __repr__(self):84 return str(self._properties)85class BaseTask(object):86 def execute_task(self):87 pass88class QueuePollTask(BaseTask):89 def __init__(self, queue_url, region, custom_resource_lookup):90 self._queue_url = queue_url91 self._region = region92 self._resource_lookup = custom_resource_lookup93 def retrieve_events(self, max_events=1):94 """Attempts to retrieve events from the provided SQS queue"""95 session = bc_session.get_session()96 sqs = session.get_service("sqs")97 receive = sqs.get_operation("ReceiveMessage")98 http_response, response_data = receive.call(sqs.get_endpoint(self._region),99 queue_url=self._queue_url,100 wait_time_seconds=20,101 max_number_of_messages=max_events)102 # Swallow up any errors/issues, logging them out103 if http_response.status_code != 200:104 log.error(u"Failed to retrieve messages from queue %s with status_code %s: %s" %105 (self._queue_url, http_response.status_code, response_data))106 return []107 events = []108 for msg in response_data.get("Messages", []):109 # Construct a message that we can parse into events.110 message = Message(self._queue_url, self._region, msg)111 try:112 # Try to parse our message as a custom resource event113 event = ResourceEvent(message)114 events.append(event)115 except Exception:116 log.exception(u"Invalid message received; will delete from queue: %s", msg)117 message.delete()118 return events119 def _find_resource(self, queue_url, service_token, resource_type):120 lookup = LookupKey((queue_url, service_token, resource_type))121 log.debug(u"Trying to locate find resource for %s", lookup)122 return self._resource_lookup.get(lookup)123 def execute_task(self):124 log.debug(u"Checking queue %s", self._queue_url)125 events = self.retrieve_events()126 tasks = []127 for event in events:128 service_token = event.get('ServiceToken')129 resource_type = event.resource_type130 # Try to locate a handler for our event, starting with most specific lookup first131 resource = self._find_resource(self._queue_url, service_token, resource_type)132 if not resource:133 resource = self._find_resource(self._queue_url, None, resource_type)134 if not resource:135 resource = self._find_resource(self._queue_url, service_token, None)136 if not resource:137 resource = self._find_resource(self._queue_url, None, None)138 # Handle the event using the found resource139 if resource:140 event.increase_timeout(resource.determine_event_timeout(event))141 tasks.append(ResourceEventTask(resource, event))142 else:143 # No handler, log an error and leave the message on the queue.144 log.error(u"Unable to find handler for Event from %s with ServiceToken(%s) and ResourceType(%s); "145 u"event will be left on queue" %146 (self._queue_url, service_token, resource_type))147 log.debug(u"Unhandled event: %s", event)148 log.debug(u"Registered handlers: %s", list(self._resource_lookup.iterkeys()))149 return tasks150class ResourceEventTask(BaseTask):151 def __init__(self, custom_resource, event):152 self._custom_resource = custom_resource153 self._event = event154 def execute_task(self):155 log.info(u"%s: Executing task for event %s" % (self._custom_resource.name, self._event))156 self._custom_resource.process_event(self._event)157 self._event.delete()...

Full Screen

Full Screen

sqs.py

Source:sqs.py Github

copy

Full Screen

...34 Open queue35 """36 client = await aio_create_client("sqs", self.aws_config, **self.client_args)37 try:38 response = await client.get_queue_url(QueueName=self.queue_name)39 except botocore.exceptions.ClientError as ex:40 await client.close()41 error_code = ex.response["Error"]["Code"]42 if error_code == "AWS.SimpleQueueService.NonExistentQueue":43 raise QueueNotFound(f"Unable to find queue `{self.queue_name}`")44 raise ClientError(error_code) from ex45 except Exception as ex:46 await client.close()47 raise ClientError() from ex48 self._client = client49 self._queue_url = response["QueueUrl"]50 async def close(self):51 """52 Close the queue...

Full Screen

Full Screen

thloud.py

Source:thloud.py Github

copy

Full Screen

...36 super().__init__()37 _sqs = boto3.client('sqs')38 _queue_name = 'thloud'39 try:40 result = _sqs.get_queue_url(QueueName=_queue_name)41 global _queue_url42 _queue_url = result['QueueUrl']43 except Exception as e:44 _sqs.create_queue(QueueName=_queue_name, Attributes={})45 _queue_url = _sqs.get_queue_url(QueueName=_queue_name)46 def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):47 dumps = pickle.dumps({"func": func, "args": args, "kwds": kwds})48 encodebytes = base64.encodebytes(dumps)49 payload = str(encodebytes, "utf-8")50 send_message_result = _sqs.send_message(QueueUrl=_queue_url, MessageBody=payload, DelaySeconds=123)51 message_id: str = send_message_result['MessageId']52 return CloudTask(message_id)53 print(6)54pool = CloudPool()55def span(func, *args, **kwargs):56 aresult = pool.apply_async(func, (*args,), kwds={**kwargs})57 return aresult58class Loop(object):59 def run_forever(self):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful