Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...511                check_message_content(attribute_value)512            except InvalidMessageContents as e:513                # AWS throws a different exception here514                raise InvalidParameterValue(e.args[0])515def check_fifo_id(fifo_id):516    if not fifo_id:517        return518    if len(fifo_id) >= 128:519        raise InvalidParameterValue(520            "Message deduplication ID and group ID must be shorter than 128 bytes"521        )522    if not re.match(FIFO_MSG_REGEX, fifo_id):523        raise InvalidParameterValue(524            "Invalid characters found. Deduplication ID and group ID can only contain"525            "alphanumeric characters as well as TODO"526        )527class SqsProvider(SqsApi, ServiceLifecycleHook):528    """529    LocalStack SQS Provider.530    LIMITATIONS:531        - Pagination of results (NextToken)532        - Delivery guarantees533        - The region is not encoded in the queue URL534    """535    queues: Dict[QueueKey, SqsQueue]536    def __init__(self) -> None:537        super().__init__()538        self.queues = {}539        self._mutex = threading.RLock()540        self._inflight_worker = InflightUpdateWorker(self.queues)541    def start(self):542        self._inflight_worker.start()543    def shutdown(self):544        self._inflight_worker.stop()545    def on_before_start(self):546        self.start()547    def on_before_stop(self):548        self.shutdown()549    def _add_queue(self, queue: SqsQueue):550        with self._mutex:551            self.queues[queue.key] = queue552    def _require_queue(self, key: QueueKey) -> SqsQueue:553        """554        Returns the queue for the given key, or raises QueueDoesNotExist if it does not exist.555        :param key: the QueueKey to look for556        :returns: the queue557        :raises QueueDoesNotExist: if the queue does not exist558        """559        with self._mutex:560            if key not in self.queues:561                raise QueueDoesNotExist()562            return self.queues[key]563    def _require_queue_by_arn(self, queue_arn: str) -> SqsQueue:564        arn = parse_arn(queue_arn)565        key = QueueKey(region=arn["region"], account_id=arn["account"], name=arn["resource"])566        return self._require_queue(key)567    def _resolve_queue(568        self,569        context: RequestContext,570        queue_name: Optional[str] = None,571        queue_url: Optional[str] = None,572    ) -> SqsQueue:573        """574        Uses resolve_queue_key to determine the QueueKey from the given input, and returns the respective queue,575        or raises QueueDoesNotExist if it does not exist.576        :param context: the request context, used for getting region and account_id, and optionally the queue_url577        :param queue_name: the queue name (if this is set, then this will be used for the key)578        :param queue_url: the queue url (if name is not set, this will be used to determine the queue name)579        :returns: the queue580        :raises QueueDoesNotExist: if the queue does not exist581        """582        key = resolve_queue_key(context, queue_name, queue_url)583        return self._require_queue(key)584    def create_queue(585        self,586        context: RequestContext,587        queue_name: String,588        attributes: QueueAttributeMap = None,589        tags: TagMap = None,590    ) -> CreateQueueResult:591        fifo = attributes and (592            attributes.get(QueueAttributeName.FifoQueue, "false").lower() == "true"593        )594        k = QueueKey(context.region, context.account_id, queue_name)595        # Special Case TODO: why is an emtpy policy passed at all? same in set_queue_attributes596        if attributes and attributes.get(QueueAttributeName.Policy) == "":597            del attributes[QueueAttributeName.Policy]598        if k in self.queues:599            raise QueueNameExists(queue_name)600        if fifo:601            queue = FifoQueue(k, attributes, tags)602        else:603            queue = StandardQueue(k, attributes, tags)604        LOG.debug("creating queue key=%s attributes=%s tags=%s", k, attributes, tags)605        self._add_queue(queue)606        return CreateQueueResult(QueueUrl=queue.url(context))607    def get_queue_url(608        self, context: RequestContext, queue_name: String, queue_owner_aws_account_id: String = None609    ) -> GetQueueUrlResult:610        account_id = queue_owner_aws_account_id or context.account_id611        key = QueueKey(context.region, account_id, queue_name)612        if key not in self.queues:613            raise QueueDoesNotExist("The specified queue does not exist for this wsdl version.")614        queue = self.queues[key]615        self._assert_permission(context, queue)616        return GetQueueUrlResult(QueueUrl=queue.url(context))617    def list_queues(618        self,619        context: RequestContext,620        queue_name_prefix: String = None,621        next_token: Token = None,622        max_results: BoxedInteger = None,623    ) -> ListQueuesResult:624        urls = []625        for queue in self.queues.values():626            if queue.key.region != context.region:627                continue628            if queue.key.account_id != context.account_id:629                continue630            if queue_name_prefix:631                if not queue.name.startswith(queue_name_prefix):632                    continue633            urls.append(queue.url(context))634        if max_results:635            # FIXME: also need to solve pagination with stateful iterators: If the total number of items available is636            #  more than the value specified, a NextToken is provided in the command's output. To resume pagination,637            #  provide the NextToken value in the starting-token argument of a subsequent command. Do not use the638            #  NextToken response element directly outside of the AWS CLI.639            urls = urls[:max_results]640        return ListQueuesResult(QueueUrls=urls)641    def change_message_visibility(642        self,643        context: RequestContext,644        queue_url: String,645        receipt_handle: String,646        visibility_timeout: Integer,647    ) -> None:648        queue = self._resolve_queue(context, queue_url=queue_url)649        self._assert_permission(context, queue)650        queue.update_visibility_timeout(receipt_handle, visibility_timeout)651    def change_message_visibility_batch(652        self,653        context: RequestContext,654        queue_url: String,655        entries: ChangeMessageVisibilityBatchRequestEntryList,656    ) -> ChangeMessageVisibilityBatchResult:657        queue = self._resolve_queue(context, queue_url=queue_url)658        self._assert_permission(context, queue)659        self._assert_batch(entries)660        successful = []661        failed = []662        with queue.mutex:663            for entry in entries:664                try:665                    queue.update_visibility_timeout(666                        entry["ReceiptHandle"], entry["VisibilityTimeout"]667                    )668                    successful.append({"Id": entry["Id"]})669                except Exception as e:670                    failed.append(671                        BatchResultErrorEntry(672                            Id=entry["Id"],673                            SenderFault=False,674                            Code=e.__class__.__name__,675                            Message=str(e),676                        )677                    )678        return ChangeMessageVisibilityBatchResult(679            Successful=successful,680            Failed=failed,681        )682    def delete_queue(self, context: RequestContext, queue_url: String) -> None:683        with self._mutex:684            queue = self._resolve_queue(context, queue_url=queue_url)685            self._assert_permission(context, queue)686            del self.queues[queue.key]687    def get_queue_attributes(688        self, context: RequestContext, queue_url: String, attribute_names: AttributeNameList = None689    ) -> GetQueueAttributesResult:690        queue = self._resolve_queue(context, queue_url=queue_url)691        self._assert_permission(context, queue)692        if not attribute_names:693            return GetQueueAttributesResult(Attributes={})694        if QueueAttributeName.All in attribute_names:695            # return GetQueueAttributesResult(Attributes=queue.attributes)696            attribute_names = queue.attributes.keys()697        result: Dict[QueueAttributeName, str] = {}698        for attr in attribute_names:699            try:700                getattr(QueueAttributeName, attr)701            except AttributeError:702                raise InvalidAttributeName(f"Unknown attribute {attr}.")703            if callable(queue.attributes.get(attr)):704                func = queue.attributes.get(attr)705                result[attr] = func()706            else:707                result[attr] = queue.attributes.get(attr)708        return GetQueueAttributesResult(Attributes=result)709    def send_message(710        self,711        context: RequestContext,712        queue_url: String,713        message_body: String,714        delay_seconds: Integer = None,715        message_attributes: MessageBodyAttributeMap = None,716        message_system_attributes: MessageBodySystemAttributeMap = None,717        message_deduplication_id: String = None,718        message_group_id: String = None,719    ) -> SendMessageResult:720        queue = self._resolve_queue(context, queue_url=queue_url)721        self._assert_permission(context, queue)722        message = self._put_message(723            queue,724            context,725            message_body,726            delay_seconds,727            message_attributes,728            message_system_attributes,729            message_deduplication_id,730            message_group_id,731        )732        return SendMessageResult(733            MessageId=message["MessageId"],734            MD5OfMessageBody=message["MD5OfBody"],735            MD5OfMessageAttributes=message.get("MD5OfMessageAttributes"),736            SequenceNumber=queue.generate_sequence_number(),737            MD5OfMessageSystemAttributes=_create_message_attribute_hash(message_system_attributes),738        )739    def send_message_batch(740        self, context: RequestContext, queue_url: String, entries: SendMessageBatchRequestEntryList741    ) -> SendMessageBatchResult:742        queue = self._resolve_queue(context, queue_url=queue_url)743        self._assert_permission(context, queue)744        self._assert_batch(entries)745        successful = []746        failed = []747        with queue.mutex:748            for entry in entries:749                try:750                    message = self._put_message(751                        queue,752                        context,753                        message_body=entry.get("MessageBody"),754                        delay_seconds=entry.get("DelaySeconds"),755                        message_attributes=entry.get("MessageAttributes"),756                        message_system_attributes=entry.get("MessageSystemAttributes"),757                        message_deduplication_id=entry.get("MessageDeduplicationId"),758                        message_group_id=entry.get("MessageGroupId"),759                    )760                    successful.append(761                        SendMessageBatchResultEntry(762                            Id=entry["Id"],763                            MessageId=message.get("MessageId"),764                            MD5OfMessageBody=message.get("MD5OfBody"),765                            MD5OfMessageAttributes=message.get("MD5OfMessageAttributes"),766                            MD5OfMessageSystemAttributes=_create_message_attribute_hash(767                                message.get("message_system_attributes")768                            ),769                            SequenceNumber=queue.generate_sequence_number(),770                        )771                    )772                except Exception as e:773                    failed.append(774                        BatchResultErrorEntry(775                            Id=entry["Id"],776                            SenderFault=False,777                            Code=e.__class__.__name__,778                            Message=str(e),779                        )780                    )781        return SendMessageBatchResult(782            Successful=successful,783            Failed=failed,784        )785    def _put_message(786        self,787        queue: SqsQueue,788        context: RequestContext,789        message_body: String,790        delay_seconds: Integer = None,791        message_attributes: MessageBodyAttributeMap = None,792        message_system_attributes: MessageBodySystemAttributeMap = None,793        message_deduplication_id: String = None,794        message_group_id: String = None,795    ) -> Message:796        # TODO: default message attributes (SenderId, ApproximateFirstReceiveTimestamp, ...)797        check_message_content(message_body)798        check_attributes(message_attributes)799        check_attributes(message_system_attributes)800        check_fifo_id(message_deduplication_id)801        check_fifo_id(message_group_id)802        message: Message = Message(803            MessageId=generate_message_id(),804            MD5OfBody=md5(message_body),805            Body=message_body,806            Attributes=self._create_message_attributes(context, message_system_attributes),807            MD5OfMessageAttributes=_create_message_attribute_hash(message_attributes),808            MessageAttributes=message_attributes,809        )810        delay_seconds = delay_seconds or queue.attributes.get(QueueAttributeName.DelaySeconds, "0")811        if int(delay_seconds):812            # FIXME: this is a pretty bad implementation (one thread per message...). polling on a priority queue813            #  would probably be better. We also need access to delayed messages for the814            #  ApproximateNumberrOfDelayedMessages attribute.815            threading.Timer(...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
