Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...443                    self.arn,444                )445                self.inflight.remove(standard_message)446                self.visible.put_nowait(standard_message)447    def enqueue_delayed_messages(self):448        if not self.delayed:449            return450        with self.mutex:451            messages = [message for message in self.delayed if not message.is_delayed]452            for standard_message in messages:453                LOG.debug(454                    "enqueueing delayed messages %s into queue %s",455                    standard_message.message["MessageId"],456                    self.arn,457                )458                self.delayed.remove(standard_message)459                self.visible.put_nowait(standard_message)460    def _assert_queue_name(self, name):461        if not re.match(r"^[a-zA-Z0-9_-]{1,80}$", name):462            raise InvalidParameterValue(463                "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"464            )465    def validate_queue_attributes(self, attributes):466        valid = [k[1] for k in inspect.getmembers(QueueAttributeName)]467        del valid[valid.index(QueueAttributeName.FifoQueue)]468        for k in attributes.keys():469            if k not in valid:470                raise InvalidAttributeName(f"Unknown Attribute {k}.")471    def generate_sequence_number(self):472        return None473class StandardQueue(SqsQueue):474    def put(475        self,476        message: Message,477        visibility_timeout: int = None,478        message_deduplication_id: str = None,479        message_group_id: str = None,480        delay_seconds: int = None,481    ):482        if message_deduplication_id:483            raise InvalidParameterValue(484                f"Value {message_deduplication_id} for parameter MessageDeduplicationId is invalid. Reason: The "485                f"request includes a parameter that is not valid for this queue type. "486            )487        if message_group_id:488            raise InvalidParameterValue(489                f"Value {message_group_id} for parameter MessageGroupId is invalid. Reason: The request includes a "490                f"parameter that is not valid for this queue type. "491            )492        standard_message = SqsMessage(time.time(), message)493        if visibility_timeout is not None:494            standard_message.visibility_timeout = visibility_timeout495        else:496            # use the attribute from the queue497            standard_message.visibility_timeout = self.visibility_timeout498        if delay_seconds is not None:499            standard_message.delay_seconds = delay_seconds500        else:501            standard_message.delay_seconds = self.delay_seconds502        if standard_message.is_delayed:503            self.delayed.add(standard_message)504        else:505            self.visible.put_nowait(standard_message)506class FifoQueue(SqsQueue):507    deduplication: Dict[str, Dict[str, SqsMessage]]508    def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:509        super().__init__(name, region, account_id, attributes, tags)510        self.deduplication = {}511    def update_delay_seconds(self, value: int):512        super(FifoQueue, self).update_delay_seconds(value)513        for message in self.delayed:514            message.delay_seconds = value515    def put(516        self,517        message: Message,518        visibility_timeout: int = None,519        message_deduplication_id: str = None,520        message_group_id: str = None,521        delay_seconds: int = None,522    ):523        if delay_seconds is not None:524            # in fifo queues, delay is only applied on queue level525            raise InvalidParameterValue(526                f"Value {delay_seconds} for parameter DelaySeconds is invalid. Reason: The request include parameter "527                f"that is not valid for this queue type."528            )529        if not message_group_id:530            raise MissingParameter("The request must contain the parameter MessageGroupId.")531        dedup_id = message_deduplication_id532        content_based_deduplication = (533            "true"534            == (self.attributes.get(QueueAttributeName.ContentBasedDeduplication, "false")).lower()535        )536        if not dedup_id and content_based_deduplication:537            dedup_id = hashlib.sha256(message.get("Body").encode("utf-8")).hexdigest()538        if not dedup_id:539            raise InvalidParameterValue(540                "The Queue should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided "541                "explicitly "542            )543        fifo_message = SqsMessage(544            time.time(),545            message,546            message_deduplication_id=dedup_id,547            message_group_id=message_group_id,548        )549        if visibility_timeout is not None:550            fifo_message.visibility_timeout = visibility_timeout551        else:552            # use the attribute from the queue553            fifo_message.visibility_timeout = self.visibility_timeout554        if delay_seconds is not None:555            fifo_message.delay_seconds = delay_seconds556        else:557            fifo_message.delay_seconds = self.delay_seconds558        original_message = None559        original_message_group = self.deduplication.get(message_group_id)560        if original_message_group:561            original_message = original_message_group.get(dedup_id)562        if (563            original_message564            and not original_message.deleted565            and original_message.priority + DEDUPLICATION_INTERVAL_IN_SEC > fifo_message.priority566        ):567            message["MessageId"] = original_message.message["MessageId"]568        else:569            if fifo_message.is_delayed:570                self.delayed.add(fifo_message)571            else:572                self.visible.put_nowait(fifo_message)573            if not original_message_group:574                self.deduplication[message_group_id] = {}575            self.deduplication[message_group_id][dedup_id] = fifo_message576    def _assert_queue_name(self, name):577        if not name.endswith(".fifo"):578            raise InvalidParameterValue(579                "The name of a FIFO queue can only include alphanumeric characters, hyphens, or underscores, "580                "must end with .fifo suffix and be 1 to 80 in length"581            )582        # The .fifo suffix counts towards the 80-character queue name quota.583        queue_name = name[:-5] + "_fifo"584        super()._assert_queue_name(queue_name)585    def validate_queue_attributes(self, attributes):586        valid = [k[1] for k in inspect.getmembers(QueueAttributeName)]587        for k in attributes.keys():588            if k not in valid:589                raise InvalidAttributeName(f"Unknown Attribute {k}.")590        # Special Cases591        fifo = attributes.get(QueueAttributeName.FifoQueue)592        if fifo and fifo.lower() != "true":593            raise InvalidAttributeValue(594                "Invalid value for the parameter FifoQueue. Reason: Modifying queue type is not supported."595            )596    # TODO: If we ever actually need to do something with this number, it needs to be part of597    #   SQSMessage. This means changing all *put*() signatures to return the saved message.598    def generate_sequence_number(self):599        return _create_mock_sequence_number()600class QueueUpdateWorker:601    """602    Regularly re-queues inflight and delayed messages whose visibility timeout has expired or delay deadline has been603    reached.604    """605    def __init__(self) -> None:606        super().__init__()607        self.scheduler = Scheduler()608        self.thread: Optional[FuncThread] = None609        self.mutex = threading.RLock()610    def do_update_all_queues(self):611        for region in SqsBackend.regions().keys():612            backend = SqsBackend.get(region)613            for queue in backend.queues.values():614                try:615                    queue.requeue_inflight_messages()616                except Exception:617                    LOG.exception("error re-queueing inflight messages")618                try:619                    queue.enqueue_delayed_messages()620                except Exception:621                    LOG.exception("error enqueueing delayed messages")622    def start(self):623        with self.mutex:624            if self.thread:625                return626            self.scheduler = Scheduler()627            self.scheduler.schedule(self.do_update_all_queues, period=1)628            def _run(*_args):629                self.scheduler.run()630            self.thread = start_thread(_run)631    def stop(self):632        with self.mutex:633            if self.scheduler:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
