Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...131    # but also add some randomness s.t. the generated receipt handles look like the ones from AWS132    handle = f"{long_uid()} {queue_arn} {message.message.get('MessageId')} {message.last_received}"133    encoded = base64.b64encode(handle.encode("utf-8"))134    return encoded.decode("utf-8")135def decode_receipt_handle(receipt_handle: str) -> str:136    try:137        handle = base64.b64decode(receipt_handle).decode("utf-8")138        _, queue_arn, message_id, last_received = handle.split(" ")139        parse_arn(queue_arn)  # raises a ValueError if it is not an arn140        return queue_arn141    except (IndexError, ValueError):142        raise ReceiptHandleIsInvalid(143            f'The input receipt handle "{receipt_handle}" is not a valid receipt handle.'144        )145class Permission(NamedTuple):146    # TODO: just a placeholder for real policies147    label: str148    account_id: str149    action: str150class SqsMessage:151    message: Message152    created: float153    visibility_timeout: int154    receive_times: int155    delay_seconds: Optional[int]156    receipt_handles: Set[str]157    last_received: Optional[float]158    first_received: Optional[float]159    visibility_deadline: Optional[float]160    deleted: bool161    priority: float162    message_deduplication_id: str163    message_group_id: str164    def __init__(165        self,166        priority: float,167        message: Message,168        message_deduplication_id: str = None,169        message_group_id: str = None,170    ) -> None:171        self.created = time.time()172        self.message = message173        self.receive_times = 0174        self.receipt_handles = set()175        self.delay_seconds = None176        self.last_received = None177        self.first_received = None178        self.deleted = False179        self.priority = priority180        attributes = {}181        if message_group_id is not None:182            attributes["MessageGroupId"] = message_group_id183        if message_deduplication_id is not None:184            attributes["MessageDeduplicationId"] = message_deduplication_id185        if self.message.get("Attributes"):186            self.message["Attributes"].update(attributes)187        else:188            self.message["Attributes"] = attributes189    @property190    def message_group_id(self) -> Optional[str]:191        return self.message["Attributes"].get("MessageGroupId")192    @property193    def message_deduplication_id(self) -> Optional[str]:194        return self.message["Attributes"].get("MessageDeduplicationId")195    def set_last_received(self, timestamp: float):196        """197        Sets the last received timestamp of the message to the given value, and updates the visibility deadline198        accordingly.199        :param timestamp: the last time the message was received200        """201        self.last_received = timestamp202        self.visibility_deadline = timestamp + self.visibility_timeout203    def update_visibility_timeout(self, timeout: int):204        """205        Sets the visibility timeout of the message to the given value, and updates the visibility deadline accordingly.206        :param timeout: the timeout value in seconds207        """208        self.visibility_timeout = timeout209        self.visibility_deadline = time.time() + timeout210    @property211    def is_visible(self) -> bool:212        """213        Returns false if the message has a visibility deadline that is in the future.214        :return: whether the message is visibile or not.215        """216        if self.visibility_deadline is None:217            return True218        if time.time() >= self.visibility_deadline:219            return True220        return False221    @property222    def is_delayed(self) -> bool:223        if self.delay_seconds is None:224            return False225        return time.time() <= self.created + self.delay_seconds226    def __gt__(self, other):227        return self.priority > other.priority228    def __ge__(self, other):229        return self.priority >= other.priority230    def __lt__(self, other):231        return self.priority < other.priority232    def __le__(self, other):233        return self.priority <= other.priority234    def __eq__(self, other):235        return self.message["MessageId"] == other.message["MessageId"]236    def __hash__(self):237        return self.message["MessageId"].__hash__()238class SqsQueue:239    name: str240    region: str241    account_id: str242    attributes: QueueAttributeMap243    tags: TagMap244    permissions: Set[Permission]245    purge_in_progress: bool246    visible: PriorityQueue247    delayed: Set[SqsMessage]248    inflight: Set[SqsMessage]249    receipts: Dict[str, SqsMessage]250    def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:251        self.name = name252        self.region = region253        self.account_id = account_id254        self._assert_queue_name(name)255        self.tags = tags or {}256        self.visible = PriorityQueue()257        self.delayed = set()258        self.inflight = set()259        self.receipts = {}260        self.attributes = self.default_attributes()261        if attributes:262            self.attributes.update(attributes)263        self.purge_in_progress = False264        self.permissions = set()265        self.mutex = threading.RLock()266    def default_attributes(self) -> QueueAttributeMap:267        return {268            QueueAttributeName.ApproximateNumberOfMessages: self.visible._qsize,269            QueueAttributeName.ApproximateNumberOfMessagesNotVisible: lambda: len(self.inflight),270            QueueAttributeName.ApproximateNumberOfMessagesDelayed: lambda: len(self.delayed),271            QueueAttributeName.CreatedTimestamp: str(now()),272            QueueAttributeName.DelaySeconds: "0",273            QueueAttributeName.LastModifiedTimestamp: str(now()),274            QueueAttributeName.MaximumMessageSize: "262144",275            QueueAttributeName.MessageRetentionPeriod: "345600",276            QueueAttributeName.QueueArn: self.arn,277            QueueAttributeName.ReceiveMessageWaitTimeSeconds: "0",278            QueueAttributeName.VisibilityTimeout: "30",279            QueueAttributeName.SqsManagedSseEnabled: "false",280        }281    def update_delay_seconds(self, value: int):282        """283        For standard queues, the per-queue delay setting is not retroactiveâchanging the setting doesn't affect the delay of messages already in the queue.284        For FIFO queues, the per-queue delay setting is retroactiveâchanging the setting affects the delay of messages already in the queue.285        https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-delay-queues.html286        :param value: the number of seconds287        """288        self.attributes[QueueAttributeName.DelaySeconds] = str(value)289    def update_last_modified(self, timestamp: int = None):290        if timestamp is None:291            timestamp = now()292        self.attributes[QueueAttributeName.LastModifiedTimestamp] = str(timestamp)293    @property294    def arn(self) -> str:295        return f"arn:aws:sqs:{self.region}:{self.account_id}:{self.name}"296    def url(self, context: RequestContext) -> str:297        """Return queue URL using either SQS_PORT_EXTERNAL (if configured), the SQS_ENDPOINT_STRATEGY (if configured)298        or based on the 'Host' request header"""299        host_url = context.request.host_url300        if config.SQS_ENDPOINT_STRATEGY == "domain":301            # queue.localhost.localstack.cloud:4566/000000000000/my-queue (us-east-1)302            # or us-east-2.queue.localhost.localstack.cloud:4566/000000000000/my-queue303            region = "" if self.region == "us-east-1" else self.region + "."304            scheme = context.request.scheme305            host_url = f"{scheme}://{region}queue.{constants.LOCALHOST_HOSTNAME}:{config.EDGE_PORT}"306        elif config.SQS_ENDPOINT_STRATEGY == "path":307            # https?://localhost:4566/queue/us-east-1/00000000000/my-queue (us-east-1)308            host_url = f"{context.request.host_url}/queue/{self.region}"309        else:310            if config.SQS_PORT_EXTERNAL:311                host_url = external_service_url("sqs")312        return "{host}/{account_id}/{name}".format(313            host=host_url.rstrip("/"),314            account_id=self.account_id,315            name=self.name,316        )317    @property318    def visibility_timeout(self) -> int:319        return int(self.attributes[QueueAttributeName.VisibilityTimeout])320    @property321    def delay_seconds(self) -> int:322        return int(self.attributes[QueueAttributeName.DelaySeconds])323    @property324    def wait_time_seconds(self) -> int:325        return int(self.attributes[QueueAttributeName.ReceiveMessageWaitTimeSeconds])326    def validate_receipt_handle(self, receipt_handle: str):327        if self.arn != decode_receipt_handle(receipt_handle):328            raise ReceiptHandleIsInvalid(329                f'The input receipt handle "{receipt_handle}" is not a valid receipt handle.'330            )331    def update_visibility_timeout(self, receipt_handle: str, visibility_timeout: int):332        with self.mutex:333            self.validate_receipt_handle(receipt_handle)334            if receipt_handle not in self.receipts:335                raise InvalidParameterValue(336                    f"Value {receipt_handle} for parameter ReceiptHandle is invalid. Reason: Message does not exist "337                    f"or is not available for visibility timeout change."338                )339            standard_message = self.receipts[receipt_handle]340            if standard_message not in self.inflight:341                raise MessageNotInflight()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
