Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...110class MissingParameter(CommonServiceException):111    def __init__(self, message):112        super().__init__("MissingParameter", message, 400, True)113@singleton_factory114def global_message_sequence():115    # creates a 20-digit number used as the start for the global sequence116    start = int(time.time()) << 33117    # itertools.count is thread safe over the GIL since its getAndIncrement operation is a single python bytecode op118    return itertools.count(start)119def generate_message_id():120    return long_uid()121def assert_queue_name(queue_name: str, fifo: bool = False):122    if queue_name.endswith(".fifo"):123        if not fifo:124            # Standard queues with .fifo suffix are not allowed125            raise InvalidParameterValue(126                "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"127            )128        # The .fifo suffix counts towards the 80-character queue name quota.129        queue_name = queue_name[:-5] + "_fifo"130    # slashes are actually not allowed, but we've allowed it explicitly in localstack131    if not re.match(r"^[a-zA-Z0-9/_-]{1,80}$", queue_name):132        raise InvalidParameterValue(133            "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"134        )135def check_message_size(message_body: str, max_message_size: int):136    # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-messages.html137    error = "One or more parameters are invalid. "138    error += f"Reason: Message must be shorter than {max_message_size} bytes."139    # must encode as utf8 to get correct bytes with len140    if len(message_body.encode("utf8")) > max_message_size:141        raise InvalidParameterValue(error)142def check_message_content(message_body: str):143    error = "Invalid characters found. Valid unicode characters are #x9 | #xA | #xD | #x20 to #xD7FF | #xE000 to #xFFFD | #x10000 to #x10FFFF"144    if not re.match(MSG_CONTENT_REGEX, message_body):145        raise InvalidMessageContents(error)146def encode_receipt_handle(queue_arn, message: "SqsMessage") -> str:147    # http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/ImportantIdentifiers.html#ImportantIdentifiers-receipt-handles148    # encode the queue arn in the receipt handle, so we can later check if it belongs to the queue149    # but also add some randomness s.t. the generated receipt handles look like the ones from AWS150    handle = f"{long_uid()} {queue_arn} {message.message.get('MessageId')} {message.last_received}"151    encoded = base64.b64encode(handle.encode("utf-8"))152    return encoded.decode("utf-8")153def decode_receipt_handle(receipt_handle: str) -> str:154    try:155        handle = base64.b64decode(receipt_handle).decode("utf-8")156        _, queue_arn, message_id, last_received = handle.split(" ")157        parse_arn(queue_arn)  # raises a ValueError if it is not an arn158        return queue_arn159    except (IndexError, ValueError):160        raise ReceiptHandleIsInvalid(161            f'The input receipt handle "{receipt_handle}" is not a valid receipt handle.'162        )163class Permission(NamedTuple):164    # TODO: just a placeholder for real policies165    label: str166    account_id: str167    action: str168class SqsMessage:169    message: Message170    created: float171    visibility_timeout: int172    receive_times: int173    delay_seconds: Optional[int]174    receipt_handles: Set[str]175    last_received: Optional[float]176    first_received: Optional[float]177    visibility_deadline: Optional[float]178    deleted: bool179    priority: float180    message_deduplication_id: str181    message_group_id: str182    sequence_number: str183    def __init__(184        self,185        priority: float,186        message: Message,187        message_deduplication_id: str = None,188        message_group_id: str = None,189        sequence_number: str = None,190    ) -> None:191        self.created = time.time()192        self.message = message193        self.receive_times = 0194        self.receipt_handles = set()195        self.delay_seconds = None196        self.last_received = None197        self.first_received = None198        self.deleted = False199        self.priority = priority200        self.sequence_number = sequence_number201        attributes = {}202        if message_group_id is not None:203            attributes["MessageGroupId"] = message_group_id204        if message_deduplication_id is not None:205            attributes["MessageDeduplicationId"] = message_deduplication_id206        if sequence_number is not None:207            attributes["SequenceNumber"] = sequence_number208        if self.message.get("Attributes"):209            self.message["Attributes"].update(attributes)210        else:211            self.message["Attributes"] = attributes212    @property213    def message_group_id(self) -> Optional[str]:214        return self.message["Attributes"].get("MessageGroupId")215    @property216    def message_deduplication_id(self) -> Optional[str]:217        return self.message["Attributes"].get("MessageDeduplicationId")218    def set_last_received(self, timestamp: float):219        """220        Sets the last received timestamp of the message to the given value, and updates the visibility deadline221        accordingly.222        :param timestamp: the last time the message was received223        """224        self.last_received = timestamp225        self.visibility_deadline = timestamp + self.visibility_timeout226    def update_visibility_timeout(self, timeout: int):227        """228        Sets the visibility timeout of the message to the given value, and updates the visibility deadline accordingly.229        :param timeout: the timeout value in seconds230        """231        self.visibility_timeout = timeout232        self.visibility_deadline = time.time() + timeout233    @property234    def is_visible(self) -> bool:235        """236        Returns false if the message has a visibility deadline that is in the future.237        :return: whether the message is visibile or not.238        """239        if self.visibility_deadline is None:240            return True241        if time.time() >= self.visibility_deadline:242            return True243        return False244    @property245    def is_delayed(self) -> bool:246        if self.delay_seconds is None:247            return False248        return time.time() <= self.created + self.delay_seconds249    def __gt__(self, other):250        return self.priority > other.priority251    def __ge__(self, other):252        return self.priority >= other.priority253    def __lt__(self, other):254        return self.priority < other.priority255    def __le__(self, other):256        return self.priority <= other.priority257    def __eq__(self, other):258        return self.message["MessageId"] == other.message["MessageId"]259    def __hash__(self):260        return self.message["MessageId"].__hash__()261class SqsQueue:262    name: str263    region: str264    account_id: str265    attributes: QueueAttributeMap266    tags: TagMap267    permissions: Set[Permission]268    purge_in_progress: bool269    purge_timestamp: Optional[float]270    visible: PriorityQueue271    delayed: Set[SqsMessage]272    inflight: Set[SqsMessage]273    receipts: Dict[str, SqsMessage]274    def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:275        self.name = name276        self.region = region277        self.account_id = account_id278        self._assert_queue_name(name)279        self.tags = tags or {}280        self.visible = PriorityQueue()281        self.delayed = set()282        self.inflight = set()283        self.receipts = {}284        self.attributes = self.default_attributes()285        if attributes:286            self.attributes.update(attributes)287        self.purge_in_progress = False288        self.purge_timestamp = None289        self.permissions = set()290        self.mutex = threading.RLock()291    def default_attributes(self) -> QueueAttributeMap:292        return {293            QueueAttributeName.ApproximateNumberOfMessages: lambda: self.visible._qsize(),294            QueueAttributeName.ApproximateNumberOfMessagesNotVisible: lambda: len(self.inflight),295            QueueAttributeName.ApproximateNumberOfMessagesDelayed: lambda: len(self.delayed),296            QueueAttributeName.CreatedTimestamp: str(now()),297            QueueAttributeName.DelaySeconds: "0",298            QueueAttributeName.LastModifiedTimestamp: str(now()),299            QueueAttributeName.MaximumMessageSize: str(DEFAULT_MAXIMUM_MESSAGE_SIZE),300            QueueAttributeName.MessageRetentionPeriod: "345600",301            QueueAttributeName.QueueArn: self.arn,302            QueueAttributeName.ReceiveMessageWaitTimeSeconds: "0",303            QueueAttributeName.VisibilityTimeout: "30",304            QueueAttributeName.SqsManagedSseEnabled: "false",305        }306    def update_delay_seconds(self, value: int):307        """308        For standard queues, the per-queue delay setting is not retroactiveâchanging the setting doesn't affect the delay of messages already in the queue.309        For FIFO queues, the per-queue delay setting is retroactiveâchanging the setting affects the delay of messages already in the queue.310        https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-delay-queues.html311        :param value: the number of seconds312        """313        self.attributes[QueueAttributeName.DelaySeconds] = str(value)314    def update_last_modified(self, timestamp: int = None):315        if timestamp is None:316            timestamp = now()317        self.attributes[QueueAttributeName.LastModifiedTimestamp] = str(timestamp)318    @property319    def arn(self) -> str:320        return f"arn:aws:sqs:{self.region}:{self.account_id}:{self.name}"321    def url(self, context: RequestContext) -> str:322        """Return queue URL using either SQS_PORT_EXTERNAL (if configured), the SQS_ENDPOINT_STRATEGY (if configured)323        or based on the 'Host' request header"""324        host_url = context.request.host_url325        if config.SQS_ENDPOINT_STRATEGY == "domain":326            # queue.localhost.localstack.cloud:4566/000000000000/my-queue (us-east-1)327            # or us-east-2.queue.localhost.localstack.cloud:4566/000000000000/my-queue328            region = "" if self.region == "us-east-1" else self.region + "."329            scheme = context.request.scheme330            host_url = f"{scheme}://{region}queue.{constants.LOCALHOST_HOSTNAME}:{config.EDGE_PORT}"331        elif config.SQS_ENDPOINT_STRATEGY == "path":332            # https?://localhost:4566/queue/us-east-1/00000000000/my-queue (us-east-1)333            host_url = f"{context.request.host_url}/queue/{self.region}"334        else:335            if config.SQS_PORT_EXTERNAL:336                host_url = external_service_url("sqs")337        return "{host}/{account_id}/{name}".format(338            host=host_url.rstrip("/"),339            account_id=self.account_id,340            name=self.name,341        )342    @property343    def visibility_timeout(self) -> int:344        return int(self.attributes[QueueAttributeName.VisibilityTimeout])345    @property346    def delay_seconds(self) -> int:347        return int(self.attributes[QueueAttributeName.DelaySeconds])348    @property349    def wait_time_seconds(self) -> int:350        return int(self.attributes[QueueAttributeName.ReceiveMessageWaitTimeSeconds])351    @property352    def maximum_message_size(self):353        return int(self.attributes[QueueAttributeName.MaximumMessageSize])354    def validate_receipt_handle(self, receipt_handle: str):355        if self.arn != decode_receipt_handle(receipt_handle):356            raise ReceiptHandleIsInvalid(357                f'The input receipt handle "{receipt_handle}" is not a valid receipt handle.'358            )359    def update_visibility_timeout(self, receipt_handle: str, visibility_timeout: int):360        with self.mutex:361            self.validate_receipt_handle(receipt_handle)362            if receipt_handle not in self.receipts:363                raise InvalidParameterValue(364                    f"Value {receipt_handle} for parameter ReceiptHandle is invalid. Reason: Message does not exist "365                    f"or is not available for visibility timeout change."366                )367            standard_message = self.receipts[receipt_handle]368            if standard_message not in self.inflight:369                raise MessageNotInflight()370            standard_message.update_visibility_timeout(visibility_timeout)371            if visibility_timeout == 0:372                LOG.info(373                    "terminating the visibility timeout of %s",374                    standard_message.message["MessageId"],375                )376                # Terminating the visibility timeout for a message377                # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html#terminating-message-visibility-timeout378                self.inflight.remove(standard_message)379                self.visible.put_nowait(standard_message)380    def remove(self, receipt_handle: str):381        with self.mutex:382            self.validate_receipt_handle(receipt_handle)383            if receipt_handle not in self.receipts:384                LOG.debug(385                    "no in-flight message found for receipt handle %s in queue %s",386                    receipt_handle,387                    self.arn,388                )389                return390            standard_message = self.receipts[receipt_handle]391            standard_message.deleted = True392            LOG.debug(393                "deleting message %s from queue %s",394                standard_message.message["MessageId"],395                self.arn,396            )397            # remove all handles398            for handle in standard_message.receipt_handles:399                del self.receipts[handle]400            standard_message.receipt_handles.clear()401            # remove in-flight message402            try:403                self.inflight.remove(standard_message)404            except KeyError:405                # this means the message was re-queued in the meantime406                # TODO: remove this message from the visible queue if it exists: a message can be removed with an old407                #  receipt handle that was issued before the message was put back in the visible queue.408                self.visible.queue.remove(standard_message)409                heapq.heapify(self.visible.queue)410    def put(411        self,412        message: Message,413        visibility_timeout: int = None,414        message_deduplication_id: str = None,415        message_group_id: str = None,416        delay_seconds: int = None,417    ) -> SqsMessage:418        raise NotImplementedError419    def get(self, block=True, timeout=None, visibility_timeout: int = None) -> SqsMessage:420        start = time.time()421        while True:422            standard_message: SqsMessage = self.visible.get(block=block, timeout=timeout)423            LOG.debug(424                "de-queued message %s from %s", standard_message.message["MessageId"], self.arn425            )426            with self.mutex:427                if standard_message.deleted:428                    # TODO: check what the behavior of AWS is here. should we return a deleted message?429                    timeout -= time.time() - start430                    if timeout < 0:431                        timeout = 0432                    continue433                # update message attributes434                standard_message.visibility_timeout = (435                    self.visibility_timeout if visibility_timeout is None else visibility_timeout436                )437                standard_message.receive_times += 1438                standard_message.set_last_received(time.time())439                if standard_message.first_received is None:440                    standard_message.first_received = standard_message.last_received441                # create and manage receipt handle442                receipt_handle = self.create_receipt_handle(standard_message)443                standard_message.receipt_handles.add(receipt_handle)444                self.receipts[receipt_handle] = standard_message445                if standard_message.visibility_timeout == 0:446                    self.visible.put_nowait(standard_message)447                else:448                    self.inflight.add(standard_message)449            # prepare message for receiver450            copied_message = copy.deepcopy(standard_message)451            copied_message.message["Attributes"][452                MessageSystemAttributeName.ApproximateReceiveCount453            ] = str(standard_message.receive_times)454            copied_message.message["Attributes"][455                MessageSystemAttributeName.ApproximateFirstReceiveTimestamp456            ] = str(int(standard_message.first_received * 1000))457            copied_message.message["ReceiptHandle"] = receipt_handle458            return copied_message459    def clear(self):460        """461        Calls clear on all internal datastructures that hold messages and data related to them.462        """463        with self.mutex:464            self.visible.queue.clear()465            self.inflight.clear()466            self.delayed.clear()467            self.receipts.clear()468    def create_receipt_handle(self, message: SqsMessage) -> str:469        return encode_receipt_handle(self.arn, message)470    def requeue_inflight_messages(self):471        if not self.inflight:472            return473        with self.mutex:474            messages = [message for message in self.inflight if message.is_visible]475            for standard_message in messages:476                LOG.debug(477                    "re-queueing inflight messages %s into queue %s",478                    standard_message.message["MessageId"],479                    self.arn,480                )481                self.inflight.remove(standard_message)482                self.visible.put_nowait(standard_message)483    def enqueue_delayed_messages(self):484        if not self.delayed:485            return486        with self.mutex:487            messages = [message for message in self.delayed if not message.is_delayed]488            for standard_message in messages:489                LOG.debug(490                    "enqueueing delayed messages %s into queue %s",491                    standard_message.message["MessageId"],492                    self.arn,493                )494                self.delayed.remove(standard_message)495                self.visible.put_nowait(standard_message)496    def _assert_queue_name(self, name):497        if not re.match(r"^[a-zA-Z0-9_-]{1,80}$", name):498            raise InvalidParameterValue(499                "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"500            )501    def validate_queue_attributes(self, attributes):502        valid = [503            k[1]504            for k in inspect.getmembers(QueueAttributeName)505            if k not in INTERNAL_QUEUE_ATTRIBUTES506        ]507        del valid[valid.index(QueueAttributeName.FifoQueue)]508        for k in attributes.keys():509            if k not in valid:510                raise InvalidAttributeName(f"Unknown Attribute {k}.")511class StandardQueue(SqsQueue):512    def put(513        self,514        message: Message,515        visibility_timeout: int = None,516        message_deduplication_id: str = None,517        message_group_id: str = None,518        delay_seconds: int = None,519    ):520        if message_deduplication_id:521            raise InvalidParameterValue(522                f"Value {message_deduplication_id} for parameter MessageDeduplicationId is invalid. Reason: The "523                f"request includes a parameter that is not valid for this queue type. "524            )525        if message_group_id:526            raise InvalidParameterValue(527                f"Value {message_group_id} for parameter MessageGroupId is invalid. Reason: The request includes a "528                f"parameter that is not valid for this queue type. "529            )530        standard_message = SqsMessage(time.time(), message)531        if visibility_timeout is not None:532            standard_message.visibility_timeout = visibility_timeout533        else:534            # use the attribute from the queue535            standard_message.visibility_timeout = self.visibility_timeout536        if delay_seconds is not None:537            standard_message.delay_seconds = delay_seconds538        else:539            standard_message.delay_seconds = self.delay_seconds540        if standard_message.is_delayed:541            self.delayed.add(standard_message)542        else:543            self.visible.put_nowait(standard_message)544        return standard_message545class FifoQueue(SqsQueue):546    deduplication: Dict[str, Dict[str, SqsMessage]]547    def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:548        super().__init__(name, region, account_id, attributes, tags)549        self.deduplication = {}550    def default_attributes(self) -> QueueAttributeMap:551        return {552            **super().default_attributes(),553            QueueAttributeName.ContentBasedDeduplication: "false",554            QueueAttributeName.DeduplicationScope: "queue",555            QueueAttributeName.FifoThroughputLimit: "perQueue",556        }557    def update_delay_seconds(self, value: int):558        super(FifoQueue, self).update_delay_seconds(value)559        for message in self.delayed:560            message.delay_seconds = value561    def put(562        self,563        message: Message,564        visibility_timeout: int = None,565        message_deduplication_id: str = None,566        message_group_id: str = None,567        delay_seconds: int = None,568    ):569        if delay_seconds:570            # in fifo queues, delay is only applied on queue level. However, explicitly setting delay_seconds=0 is valid571            raise InvalidParameterValue(572                f"Value {delay_seconds} for parameter DelaySeconds is invalid. Reason: The request include parameter "573                f"that is not valid for this queue type."574            )575        if not message_group_id:576            raise MissingParameter("The request must contain the parameter MessageGroupId.")577        dedup_id = message_deduplication_id578        content_based_deduplication = (579            "true"580            == (self.attributes.get(QueueAttributeName.ContentBasedDeduplication, "false")).lower()581        )582        if not dedup_id and content_based_deduplication:583            dedup_id = hashlib.sha256(message.get("Body").encode("utf-8")).hexdigest()584        if not dedup_id:585            raise InvalidParameterValue(586                "The Queue should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided "587                "explicitly "588            )589        fifo_message = SqsMessage(590            time.time(),591            message,592            message_deduplication_id=dedup_id,593            message_group_id=message_group_id,594            sequence_number=str(self.next_sequence_number()),595        )596        if visibility_timeout is not None:597            fifo_message.visibility_timeout = visibility_timeout598        else:599            # use the attribute from the queue600            fifo_message.visibility_timeout = self.visibility_timeout601        if delay_seconds is not None:602            fifo_message.delay_seconds = delay_seconds603        else:604            fifo_message.delay_seconds = self.delay_seconds605        original_message = None606        original_message_group = self.deduplication.get(message_group_id)607        if original_message_group:608            original_message = original_message_group.get(dedup_id)609        if (610            original_message611            and not original_message.deleted612            and original_message.priority + DEDUPLICATION_INTERVAL_IN_SEC > fifo_message.priority613        ):614            message["MessageId"] = original_message.message["MessageId"]615        else:616            if fifo_message.is_delayed:617                self.delayed.add(fifo_message)618            else:619                self.visible.put_nowait(fifo_message)620            if not original_message_group:621                self.deduplication[message_group_id] = {}622            self.deduplication[message_group_id][dedup_id] = fifo_message623        return fifo_message624    def _assert_queue_name(self, name):625        if not name.endswith(".fifo"):626            raise InvalidParameterValue(627                "The name of a FIFO queue can only include alphanumeric characters, hyphens, or underscores, "628                "must end with .fifo suffix and be 1 to 80 in length"629            )630        # The .fifo suffix counts towards the 80-character queue name quota.631        queue_name = name[:-5] + "_fifo"632        super()._assert_queue_name(queue_name)633    def validate_queue_attributes(self, attributes):634        valid = [635            k[1]636            for k in inspect.getmembers(QueueAttributeName)637            if k not in INTERNAL_QUEUE_ATTRIBUTES638        ]639        for k in attributes.keys():640            if k not in valid:641                raise InvalidAttributeName(f"Unknown Attribute {k}.")642        # Special Cases643        fifo = attributes.get(QueueAttributeName.FifoQueue)644        if fifo and fifo.lower() != "true":645            raise InvalidAttributeValue(646                "Invalid value for the parameter FifoQueue. Reason: Modifying queue type is not supported."647            )648    def next_sequence_number(self):649        return next(global_message_sequence())650class QueueUpdateWorker:651    """652    Regularly re-queues inflight and delayed messages whose visibility timeout has expired or delay deadline has been653    reached.654    """655    def __init__(self) -> None:656        super().__init__()657        self.scheduler = Scheduler()658        self.thread: Optional[FuncThread] = None659        self.mutex = threading.RLock()660    def do_update_all_queues(self):661        for region in SqsBackend.regions().keys():662            backend = SqsBackend.get(region)663            for queue in backend.queues.values():...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
