Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...559        with self._mutex:560            if key not in self.queues:561                raise QueueDoesNotExist()562            return self.queues[key]563    def _require_queue_by_arn(self, queue_arn: str) -> SqsQueue:564        arn = parse_arn(queue_arn)565        key = QueueKey(region=arn["region"], account_id=arn["account"], name=arn["resource"])566        return self._require_queue(key)567    def _resolve_queue(568        self,569        context: RequestContext,570        queue_name: Optional[str] = None,571        queue_url: Optional[str] = None,572    ) -> SqsQueue:573        """574        Uses resolve_queue_key to determine the QueueKey from the given input, and returns the respective queue,575        or raises QueueDoesNotExist if it does not exist.576        :param context: the request context, used for getting region and account_id, and optionally the queue_url577        :param queue_name: the queue name (if this is set, then this will be used for the key)578        :param queue_url: the queue url (if name is not set, this will be used to determine the queue name)579        :returns: the queue580        :raises QueueDoesNotExist: if the queue does not exist581        """582        key = resolve_queue_key(context, queue_name, queue_url)583        return self._require_queue(key)584    def create_queue(585        self,586        context: RequestContext,587        queue_name: String,588        attributes: QueueAttributeMap = None,589        tags: TagMap = None,590    ) -> CreateQueueResult:591        fifo = attributes and (592            attributes.get(QueueAttributeName.FifoQueue, "false").lower() == "true"593        )594        k = QueueKey(context.region, context.account_id, queue_name)595        # Special Case TODO: why is an emtpy policy passed at all? same in set_queue_attributes596        if attributes and attributes.get(QueueAttributeName.Policy) == "":597            del attributes[QueueAttributeName.Policy]598        if k in self.queues:599            raise QueueNameExists(queue_name)600        if fifo:601            queue = FifoQueue(k, attributes, tags)602        else:603            queue = StandardQueue(k, attributes, tags)604        LOG.debug("creating queue key=%s attributes=%s tags=%s", k, attributes, tags)605        self._add_queue(queue)606        return CreateQueueResult(QueueUrl=queue.url(context))607    def get_queue_url(608        self, context: RequestContext, queue_name: String, queue_owner_aws_account_id: String = None609    ) -> GetQueueUrlResult:610        account_id = queue_owner_aws_account_id or context.account_id611        key = QueueKey(context.region, account_id, queue_name)612        if key not in self.queues:613            raise QueueDoesNotExist("The specified queue does not exist for this wsdl version.")614        queue = self.queues[key]615        self._assert_permission(context, queue)616        return GetQueueUrlResult(QueueUrl=queue.url(context))617    def list_queues(618        self,619        context: RequestContext,620        queue_name_prefix: String = None,621        next_token: Token = None,622        max_results: BoxedInteger = None,623    ) -> ListQueuesResult:624        urls = []625        for queue in self.queues.values():626            if queue.key.region != context.region:627                continue628            if queue.key.account_id != context.account_id:629                continue630            if queue_name_prefix:631                if not queue.name.startswith(queue_name_prefix):632                    continue633            urls.append(queue.url(context))634        if max_results:635            # FIXME: also need to solve pagination with stateful iterators: If the total number of items available is636            #  more than the value specified, a NextToken is provided in the command's output. To resume pagination,637            #  provide the NextToken value in the starting-token argument of a subsequent command. Do not use the638            #  NextToken response element directly outside of the AWS CLI.639            urls = urls[:max_results]640        return ListQueuesResult(QueueUrls=urls)641    def change_message_visibility(642        self,643        context: RequestContext,644        queue_url: String,645        receipt_handle: String,646        visibility_timeout: Integer,647    ) -> None:648        queue = self._resolve_queue(context, queue_url=queue_url)649        self._assert_permission(context, queue)650        queue.update_visibility_timeout(receipt_handle, visibility_timeout)651    def change_message_visibility_batch(652        self,653        context: RequestContext,654        queue_url: String,655        entries: ChangeMessageVisibilityBatchRequestEntryList,656    ) -> ChangeMessageVisibilityBatchResult:657        queue = self._resolve_queue(context, queue_url=queue_url)658        self._assert_permission(context, queue)659        self._assert_batch(entries)660        successful = []661        failed = []662        with queue.mutex:663            for entry in entries:664                try:665                    queue.update_visibility_timeout(666                        entry["ReceiptHandle"], entry["VisibilityTimeout"]667                    )668                    successful.append({"Id": entry["Id"]})669                except Exception as e:670                    failed.append(671                        BatchResultErrorEntry(672                            Id=entry["Id"],673                            SenderFault=False,674                            Code=e.__class__.__name__,675                            Message=str(e),676                        )677                    )678        return ChangeMessageVisibilityBatchResult(679            Successful=successful,680            Failed=failed,681        )682    def delete_queue(self, context: RequestContext, queue_url: String) -> None:683        with self._mutex:684            queue = self._resolve_queue(context, queue_url=queue_url)685            self._assert_permission(context, queue)686            del self.queues[queue.key]687    def get_queue_attributes(688        self, context: RequestContext, queue_url: String, attribute_names: AttributeNameList = None689    ) -> GetQueueAttributesResult:690        queue = self._resolve_queue(context, queue_url=queue_url)691        self._assert_permission(context, queue)692        if not attribute_names:693            return GetQueueAttributesResult(Attributes={})694        if QueueAttributeName.All in attribute_names:695            # return GetQueueAttributesResult(Attributes=queue.attributes)696            attribute_names = queue.attributes.keys()697        result: Dict[QueueAttributeName, str] = {}698        for attr in attribute_names:699            try:700                getattr(QueueAttributeName, attr)701            except AttributeError:702                raise InvalidAttributeName(f"Unknown attribute {attr}.")703            if callable(queue.attributes.get(attr)):704                func = queue.attributes.get(attr)705                result[attr] = func()706            else:707                result[attr] = queue.attributes.get(attr)708        return GetQueueAttributesResult(Attributes=result)709    def send_message(710        self,711        context: RequestContext,712        queue_url: String,713        message_body: String,714        delay_seconds: Integer = None,715        message_attributes: MessageBodyAttributeMap = None,716        message_system_attributes: MessageBodySystemAttributeMap = None,717        message_deduplication_id: String = None,718        message_group_id: String = None,719    ) -> SendMessageResult:720        queue = self._resolve_queue(context, queue_url=queue_url)721        self._assert_permission(context, queue)722        message = self._put_message(723            queue,724            context,725            message_body,726            delay_seconds,727            message_attributes,728            message_system_attributes,729            message_deduplication_id,730            message_group_id,731        )732        return SendMessageResult(733            MessageId=message["MessageId"],734            MD5OfMessageBody=message["MD5OfBody"],735            MD5OfMessageAttributes=message.get("MD5OfMessageAttributes"),736            SequenceNumber=queue.generate_sequence_number(),737            MD5OfMessageSystemAttributes=_create_message_attribute_hash(message_system_attributes),738        )739    def send_message_batch(740        self, context: RequestContext, queue_url: String, entries: SendMessageBatchRequestEntryList741    ) -> SendMessageBatchResult:742        queue = self._resolve_queue(context, queue_url=queue_url)743        self._assert_permission(context, queue)744        self._assert_batch(entries)745        successful = []746        failed = []747        with queue.mutex:748            for entry in entries:749                try:750                    message = self._put_message(751                        queue,752                        context,753                        message_body=entry.get("MessageBody"),754                        delay_seconds=entry.get("DelaySeconds"),755                        message_attributes=entry.get("MessageAttributes"),756                        message_system_attributes=entry.get("MessageSystemAttributes"),757                        message_deduplication_id=entry.get("MessageDeduplicationId"),758                        message_group_id=entry.get("MessageGroupId"),759                    )760                    successful.append(761                        SendMessageBatchResultEntry(762                            Id=entry["Id"],763                            MessageId=message.get("MessageId"),764                            MD5OfMessageBody=message.get("MD5OfBody"),765                            MD5OfMessageAttributes=message.get("MD5OfMessageAttributes"),766                            MD5OfMessageSystemAttributes=_create_message_attribute_hash(767                                message.get("message_system_attributes")768                            ),769                            SequenceNumber=queue.generate_sequence_number(),770                        )771                    )772                except Exception as e:773                    failed.append(774                        BatchResultErrorEntry(775                            Id=entry["Id"],776                            SenderFault=False,777                            Code=e.__class__.__name__,778                            Message=str(e),779                        )780                    )781        return SendMessageBatchResult(782            Successful=successful,783            Failed=failed,784        )785    def _put_message(786        self,787        queue: SqsQueue,788        context: RequestContext,789        message_body: String,790        delay_seconds: Integer = None,791        message_attributes: MessageBodyAttributeMap = None,792        message_system_attributes: MessageBodySystemAttributeMap = None,793        message_deduplication_id: String = None,794        message_group_id: String = None,795    ) -> Message:796        # TODO: default message attributes (SenderId, ApproximateFirstReceiveTimestamp, ...)797        check_message_content(message_body)798        check_attributes(message_attributes)799        check_attributes(message_system_attributes)800        check_fifo_id(message_deduplication_id)801        check_fifo_id(message_group_id)802        message: Message = Message(803            MessageId=generate_message_id(),804            MD5OfBody=md5(message_body),805            Body=message_body,806            Attributes=self._create_message_attributes(context, message_system_attributes),807            MD5OfMessageAttributes=_create_message_attribute_hash(message_attributes),808            MessageAttributes=message_attributes,809        )810        delay_seconds = delay_seconds or queue.attributes.get(QueueAttributeName.DelaySeconds, "0")811        if int(delay_seconds):812            # FIXME: this is a pretty bad implementation (one thread per message...). polling on a priority queue813            #  would probably be better. We also need access to delayed messages for the814            #  ApproximateNumberrOfDelayedMessages attribute.815            threading.Timer(816                int(delay_seconds),817                queue.put,818                args=(message, message_deduplication_id, message_group_id),819            ).start()820        else:821            queue.put(822                message=message,823                message_deduplication_id=message_deduplication_id,824                message_group_id=message_group_id,825            )826        return message827    def receive_message(828        self,829        context: RequestContext,830        queue_url: String,831        attribute_names: AttributeNameList = None,832        message_attribute_names: MessageAttributeNameList = None,833        max_number_of_messages: Integer = None,834        visibility_timeout: Integer = None,835        wait_time_seconds: Integer = None,836        receive_request_attempt_id: String = None,837    ) -> ReceiveMessageResult:838        queue = self._resolve_queue(context, queue_url=queue_url)839        self._assert_permission(context, queue)840        num = max_number_of_messages or 1841        block = wait_time_seconds is not None842        # collect messages843        messages = []844        while num:845            try:846                standard_message = queue.get(847                    block=block, timeout=wait_time_seconds, visibility_timeout=visibility_timeout848                )849                msg = standard_message.message850            except Empty:851                break852            moved_to_dlq = False853            if (854                queue.attributes855                and queue.attributes.get(QueueAttributeName.RedrivePolicy) is not None856            ):857                moved_to_dlq = self._dead_letter_check(queue, standard_message, context)858            if moved_to_dlq:859                continue860            # filter attributes861            if message_attribute_names:862                if "All" not in message_attribute_names:863                    msg["MessageAttributes"] = {864                        k: v865                        for k, v in msg["MessageAttributes"].items()866                        if k in message_attribute_names867                    }868                # TODO: why is this called even if we receive "All" attributes?869                msg["MD5OfMessageAttributes"] = _create_message_attribute_hash(870                    msg["MessageAttributes"]871                )872            else:873                del msg["MessageAttributes"]874            # add message to result875            messages.append(msg)876            num -= 1877        # TODO: how does receiving behave if the queue was deleted in the meantime?878        return ReceiveMessageResult(Messages=messages)879    def _dead_letter_check(880        self, queue: SqsQueue, std_m: SqsMessage, context: RequestContext881    ) -> bool:882        redrive_policy = json.loads(queue.attributes.get(QueueAttributeName.RedrivePolicy))883        # TODO: include the names of the dictionary sub - attributes in the autogenerated code?884        max_receive_count = redrive_policy["maxReceiveCount"]885        if std_m.receive_times > max_receive_count:886            dead_letter_target_arn = redrive_policy["deadLetterTargetArn"]887            dl_queue = self._require_queue_by_arn(dead_letter_target_arn)888            # TODO: this needs to be atomic?889            dead_message = std_m.message890            dl_queue.put(message=dead_message)891            queue.remove(std_m.message["ReceiptHandle"])892            return True893        else:894            return False895    def delete_message(896        self, context: RequestContext, queue_url: String, receipt_handle: String897    ) -> None:898        queue = self._resolve_queue(context, queue_url=queue_url)899        self._assert_permission(context, queue)900        queue.remove(receipt_handle)901    def delete_message_batch(...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
