Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...172    inflight: Set[SqsMessage]173    receipts: Dict[str, SqsMessage]174    def __init__(self, key: QueueKey, attributes=None, tags=None) -> None:175        super().__init__()176        self._assert_queue_name(key.name)177        self.key = key178        self.tags = tags or {}179        self.visible = PriorityQueue()180        self.inflight = set()181        self.receipts = {}182        self.attributes = self.default_attributes()183        if attributes:184            self.attributes.update(attributes)185        self.purge_in_progress = False186        self.permissions = set()187        self.mutex = threading.RLock()188    def default_attributes(self) -> QueueAttributeMap:189        return {190            QueueAttributeName.QueueArn: self.arn,191            QueueAttributeName.ApproximateNumberOfMessages: self.visible._qsize,192            QueueAttributeName.ApproximateNumberOfMessagesNotVisible: lambda: len(self.inflight),193            QueueAttributeName.ApproximateNumberOfMessagesDelayed: "0",  # FIXME: this should also be callable194            QueueAttributeName.CreatedTimestamp: str(now()),195            QueueAttributeName.LastModifiedTimestamp: str(now()),196            QueueAttributeName.VisibilityTimeout: "30",197            QueueAttributeName.MaximumMessageSize: "262144",198            QueueAttributeName.MessageRetentionPeriod: "345600",199            QueueAttributeName.DelaySeconds: "0",200            QueueAttributeName.ReceiveMessageWaitTimeSeconds: "0",201        }202    def update_last_modified(self, timestamp: int = None):203        if timestamp is None:204            timestamp = now()205        self.attributes[QueueAttributeName.LastModifiedTimestamp] = str(timestamp)206    @property207    def name(self):208        return self.key.name209    @property210    def owner(self):211        return self.key.account_id212    @property213    def arn(self) -> str:214        return f"arn:aws:sqs:{self.key.region}:{self.key.account_id}:{self.key.name}"215    def url(self, context: RequestContext) -> str:216        """Return queue URL using either SQS_PORT_EXTERNAL (if configured), or based on the 'Host' request header"""217        host_url = context.request.host_url218        if config.SQS_PORT_EXTERNAL:219            host_url = external_service_url("sqs")220        return "{host}/{account_id}/{name}".format(221            host=host_url.rstrip("/"),222            account_id=self.key.account_id,223            name=self.key.name,224        )225    @property226    def visibility_timeout(self) -> int:227        return int(self.attributes[QueueAttributeName.VisibilityTimeout])228    def update_visibility_timeout(self, receipt_handle: str, visibility_timeout: int):229        with self.mutex:230            if receipt_handle not in self.receipts:231                raise ReceiptHandleIsInvalid(232                    f'The input receipt handle "{receipt_handle}" is not a valid receipt handle.'233                )234            standard_message = self.receipts[receipt_handle]235            if standard_message not in self.inflight:236                raise MessageNotInflight()237            standard_message.visibility_timeout = visibility_timeout238            if visibility_timeout == 0:239                LOG.info(240                    "terminating the visibility timeout of %s",241                    standard_message.message["MessageId"],242                )243                # Terminating the visibility timeout for a message244                # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html#terminating-message-visibility-timeout245                self.inflight.remove(standard_message)246                self.visible.put_nowait(standard_message)247    def remove(self, receipt_handle: str):248        with self.mutex:249            if receipt_handle not in self.receipts:250                LOG.debug(251                    "no in-flight message found for receipt handle %s in queue %s",252                    receipt_handle,253                    self.arn,254                )255                return256            standard_message = self.receipts[receipt_handle]257            standard_message.deleted = True258            LOG.debug(259                "deleting message %s from queue %s", standard_message.message["MessageId"], self.arn260            )261            # remove all handles262            for handle in standard_message.receipt_handles:263                del self.receipts[handle]264            standard_message.receipt_handles.clear()265            # remove in-flight message266            try:267                self.inflight.remove(standard_message)268            except KeyError:269                # this means the message was re-queued in the meantime270                # TODO: remove this message from the visible queue if it exists: a message can be removed with an old271                #  receipt handle that was issued before the message was put back in the visible queue.272                self.visible.queue.remove(standard_message)273                heapq.heapify(self.visible.queue)274                pass275    def put(276        self,277        message: Message,278        visibility_timeout: int = None,279        message_deduplication_id: str = None,280        message_group_id: str = None,281    ):282        raise NotImplementedError283    def get(self, block=True, timeout=None, visibility_timeout: int = None) -> SqsMessage:284        start = time.time()285        while True:286            standard_message: SqsMessage = self.visible.get(block=block, timeout=timeout)287            LOG.debug(288                "de-queued message %s from %s", standard_message.message["MessageId"], self.arn289            )290            with self.mutex:291                if standard_message.deleted:292                    # TODO: check what the behavior of AWS is here. should we return a deleted message?293                    timeout -= time.time() - start294                    if timeout < 0:295                        timeout = 0296                    continue297                # update message attributes298                standard_message.visibility_timeout = (299                    self.visibility_timeout if visibility_timeout is None else visibility_timeout300                )301                standard_message.receive_times += 1302                standard_message.last_received = time.time()303                if standard_message.first_received is None:304                    standard_message.first_received = standard_message.last_received305                # create and manage receipt handle306                receipt_handle = generate_receipt_handle()307                standard_message.receipt_handles.add(receipt_handle)308                self.receipts[receipt_handle] = standard_message309                if standard_message.visibility_timeout == 0:310                    self.visible.put_nowait(standard_message)311                else:312                    self.inflight.add(standard_message)313                # prepare message for receiver314                # TODO: update message attributes (ApproximateFirstReceiveTimestamp, ApproximateReceiveCount)315                copied_message = copy.deepcopy(standard_message)316                copied_message.message["ReceiptHandle"] = receipt_handle317            return copied_message318    def requeue_inflight_messages(self):319        if not self.inflight:320            return321        with self.mutex:322            messages = list(self.inflight)323            for standard_message in messages:324                if standard_message.is_visible:325                    LOG.debug(326                        "re-queueing inflight messages %s into queue %s",327                        standard_message.message["MessageId"],328                        self.arn,329                    )330                    self.inflight.remove(standard_message)331                    self.visible.put_nowait(standard_message)332    def _assert_queue_name(self, name):333        if not re.match(r"^[a-zA-Z0-9_-]{1,80}$", name):334            raise InvalidParameterValue(335                "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"336            )337    def validate_queue_attributes(self, attributes):338        valid = [k[1] for k in inspect.getmembers(QueueAttributeName)]339        del valid[valid.index(QueueAttributeName.FifoQueue)]340        for k in attributes.keys():341            if k not in valid:342                raise InvalidAttributeName(f"Unknown attribute name {k}")343    def generate_sequence_number(self):344        return None345class StandardQueue(SqsQueue):346    def put(347        self,348        message: Message,349        visibility_timeout: int = None,350        message_deduplication_id: str = None,351        message_group_id: str = None,352    ):353        if message_deduplication_id:354            raise InvalidParameterValue(355                f"Value {message_deduplication_id} for parameter MessageDeduplicationId is invalid. Reason: The "356                f"request includes a parameter that is not valid for this queue type. "357            )358        if message_group_id:359            raise InvalidParameterValue(360                f"Value {message_group_id} for parameter MessageGroupId is invalid. Reason: The request includes a "361                f"parameter that is not valid for this queue type. "362            )363        standard_message = SqsMessage(time.time(), message)364        if visibility_timeout is not None:365            standard_message.visibility_timeout = visibility_timeout366        else:367            # use the attribute from the queue368            standard_message.visibility_timeout = self.visibility_timeout369        self.visible.put_nowait(standard_message)370class FifoQueue(SqsQueue):371    visible: PriorityQueue372    inflight: Set[SqsMessage]373    receipts: Dict[str, SqsMessage]374    deduplication: Dict[str, Dict[str, SqsMessage]]375    def __init__(self, key: QueueKey, attributes=None, tags=None) -> None:376        super().__init__(key, attributes, tags)377        self.deduplication = {}378    def put(379        self,380        message: Message,381        visibility_timeout: int = None,382        message_deduplication_id: str = None,383        message_group_id: str = None,384    ):385        if not message_group_id:386            raise MissingParameter("The request must contain the parameter MessageGroupId.")387        dedup_id = message_deduplication_id388        content_based_deduplication = (389            "true"390            == (self.attributes.get(QueueAttributeName.ContentBasedDeduplication, "false")).lower()391        )392        if not dedup_id and content_based_deduplication:393            dedup_id = hashlib.sha256(message.get("Body").encode("utf-8")).hexdigest()394        if not dedup_id:395            raise InvalidParameterValue(396                "The Queue should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided "397                "explicitly "398            )399        qm = SqsMessage(400            time.time(),401            message,402            message_deduplication_id=dedup_id,403            message_group_id=message_group_id,404        )405        if visibility_timeout is not None:406            qm.visibility_timeout = visibility_timeout407        else:408            # use the attribute from the queue409            qm.visibility_timeout = self.visibility_timeout410        original_message = None411        original_message_group = self.deduplication.get(message_group_id)412        if original_message_group:413            original_message = original_message_group.get(dedup_id)414        if (415            original_message416            and not original_message.deleted417            and original_message.priority + DEDUPLICATION_INTERVAL_IN_SEC > qm.priority418        ):419            message["MessageId"] = original_message.message["MessageId"]420        else:421            self.visible.put_nowait(qm)422            if not original_message_group:423                self.deduplication[message_group_id] = {}424            self.deduplication[message_group_id][message_deduplication_id] = qm425    def _assert_queue_name(self, name):426        if not name.endswith(".fifo"):427            raise InvalidParameterValue(428                "The name of a FIFO queue can only include alphanumeric characters, hyphens, or underscores, "429                "must end with .fifo suffix and be 1 to 80 in length"430            )431        # The .fifo suffix counts towards the 80-character queue name quota.432        queue_name = name[:-5] + "_fifo"433        super()._assert_queue_name(queue_name)434    def validate_queue_attributes(self, attributes):435        valid = [k[1] for k in inspect.getmembers(QueueAttributeName)]436        for k in attributes.keys():437            if k not in valid:438                raise InvalidAttributeName(f"Unknown attribute name {k}")439        # Special Cases440        fifo = attributes.get(QueueAttributeName.FifoQueue)441        if fifo and fifo.lower() != "true":442            raise InvalidAttributeValue(443                "Invalid value for the parameter FifoQueue. Reason: Modifying queue type is not supported."444            )445    # TODO: If we ever actually need to do something with this number, it needs to be part of446    #   SQSMessage. This means changing all *put*() signatures to return the saved message.447    def generate_sequence_number(self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
