Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...191        return self.message["Attributes"].get("MessageGroupId")192    @property193    def message_deduplication_id(self) -> Optional[str]:194        return self.message["Attributes"].get("MessageDeduplicationId")195    def set_last_received(self, timestamp: float):196        """197        Sets the last received timestamp of the message to the given value, and updates the visibility deadline198        accordingly.199        :param timestamp: the last time the message was received200        """201        self.last_received = timestamp202        self.visibility_deadline = timestamp + self.visibility_timeout203    def update_visibility_timeout(self, timeout: int):204        """205        Sets the visibility timeout of the message to the given value, and updates the visibility deadline accordingly.206        :param timeout: the timeout value in seconds207        """208        self.visibility_timeout = timeout209        self.visibility_deadline = time.time() + timeout210    @property211    def is_visible(self) -> bool:212        """213        Returns false if the message has a visibility deadline that is in the future.214        :return: whether the message is visibile or not.215        """216        if self.visibility_deadline is None:217            return True218        if time.time() >= self.visibility_deadline:219            return True220        return False221    @property222    def is_delayed(self) -> bool:223        if self.delay_seconds is None:224            return False225        return time.time() <= self.created + self.delay_seconds226    def __gt__(self, other):227        return self.priority > other.priority228    def __ge__(self, other):229        return self.priority >= other.priority230    def __lt__(self, other):231        return self.priority < other.priority232    def __le__(self, other):233        return self.priority <= other.priority234    def __eq__(self, other):235        return self.message["MessageId"] == other.message["MessageId"]236    def __hash__(self):237        return self.message["MessageId"].__hash__()238class SqsQueue:239    name: str240    region: str241    account_id: str242    attributes: QueueAttributeMap243    tags: TagMap244    permissions: Set[Permission]245    purge_in_progress: bool246    visible: PriorityQueue247    delayed: Set[SqsMessage]248    inflight: Set[SqsMessage]249    receipts: Dict[str, SqsMessage]250    def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:251        self.name = name252        self.region = region253        self.account_id = account_id254        self._assert_queue_name(name)255        self.tags = tags or {}256        self.visible = PriorityQueue()257        self.delayed = set()258        self.inflight = set()259        self.receipts = {}260        self.attributes = self.default_attributes()261        if attributes:262            self.attributes.update(attributes)263        self.purge_in_progress = False264        self.permissions = set()265        self.mutex = threading.RLock()266    def default_attributes(self) -> QueueAttributeMap:267        return {268            QueueAttributeName.ApproximateNumberOfMessages: self.visible._qsize,269            QueueAttributeName.ApproximateNumberOfMessagesNotVisible: lambda: len(self.inflight),270            QueueAttributeName.ApproximateNumberOfMessagesDelayed: lambda: len(self.delayed),271            QueueAttributeName.CreatedTimestamp: str(now()),272            QueueAttributeName.DelaySeconds: "0",273            QueueAttributeName.LastModifiedTimestamp: str(now()),274            QueueAttributeName.MaximumMessageSize: "262144",275            QueueAttributeName.MessageRetentionPeriod: "345600",276            QueueAttributeName.QueueArn: self.arn,277            QueueAttributeName.ReceiveMessageWaitTimeSeconds: "0",278            QueueAttributeName.VisibilityTimeout: "30",279            QueueAttributeName.SqsManagedSseEnabled: "false",280        }281    def update_delay_seconds(self, value: int):282        """283        For standard queues, the per-queue delay setting is not retroactiveâchanging the setting doesn't affect the delay of messages already in the queue.284        For FIFO queues, the per-queue delay setting is retroactiveâchanging the setting affects the delay of messages already in the queue.285        https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-delay-queues.html286        :param value: the number of seconds287        """288        self.attributes[QueueAttributeName.DelaySeconds] = str(value)289    def update_last_modified(self, timestamp: int = None):290        if timestamp is None:291            timestamp = now()292        self.attributes[QueueAttributeName.LastModifiedTimestamp] = str(timestamp)293    @property294    def arn(self) -> str:295        return f"arn:aws:sqs:{self.region}:{self.account_id}:{self.name}"296    def url(self, context: RequestContext) -> str:297        """Return queue URL using either SQS_PORT_EXTERNAL (if configured), the SQS_ENDPOINT_STRATEGY (if configured)298        or based on the 'Host' request header"""299        host_url = context.request.host_url300        if config.SQS_ENDPOINT_STRATEGY == "domain":301            # queue.localhost.localstack.cloud:4566/000000000000/my-queue (us-east-1)302            # or us-east-2.queue.localhost.localstack.cloud:4566/000000000000/my-queue303            region = "" if self.region == "us-east-1" else self.region + "."304            scheme = context.request.scheme305            host_url = f"{scheme}://{region}queue.{constants.LOCALHOST_HOSTNAME}:{config.EDGE_PORT}"306        elif config.SQS_ENDPOINT_STRATEGY == "path":307            # https?://localhost:4566/queue/us-east-1/00000000000/my-queue (us-east-1)308            host_url = f"{context.request.host_url}/queue/{self.region}"309        else:310            if config.SQS_PORT_EXTERNAL:311                host_url = external_service_url("sqs")312        return "{host}/{account_id}/{name}".format(313            host=host_url.rstrip("/"),314            account_id=self.account_id,315            name=self.name,316        )317    @property318    def visibility_timeout(self) -> int:319        return int(self.attributes[QueueAttributeName.VisibilityTimeout])320    @property321    def delay_seconds(self) -> int:322        return int(self.attributes[QueueAttributeName.DelaySeconds])323    @property324    def wait_time_seconds(self) -> int:325        return int(self.attributes[QueueAttributeName.ReceiveMessageWaitTimeSeconds])326    def validate_receipt_handle(self, receipt_handle: str):327        if self.arn != decode_receipt_handle(receipt_handle):328            raise ReceiptHandleIsInvalid(329                f'The input receipt handle "{receipt_handle}" is not a valid receipt handle.'330            )331    def update_visibility_timeout(self, receipt_handle: str, visibility_timeout: int):332        with self.mutex:333            self.validate_receipt_handle(receipt_handle)334            if receipt_handle not in self.receipts:335                raise InvalidParameterValue(336                    f"Value {receipt_handle} for parameter ReceiptHandle is invalid. Reason: Message does not exist "337                    f"or is not available for visibility timeout change."338                )339            standard_message = self.receipts[receipt_handle]340            if standard_message not in self.inflight:341                raise MessageNotInflight()342            standard_message.update_visibility_timeout(visibility_timeout)343            if visibility_timeout == 0:344                LOG.info(345                    "terminating the visibility timeout of %s",346                    standard_message.message["MessageId"],347                )348                # Terminating the visibility timeout for a message349                # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html#terminating-message-visibility-timeout350                self.inflight.remove(standard_message)351                self.visible.put_nowait(standard_message)352    def remove(self, receipt_handle: str):353        with self.mutex:354            self.validate_receipt_handle(receipt_handle)355            if receipt_handle not in self.receipts:356                LOG.debug(357                    "no in-flight message found for receipt handle %s in queue %s",358                    receipt_handle,359                    self.arn,360                )361                return362            standard_message = self.receipts[receipt_handle]363            standard_message.deleted = True364            LOG.debug(365                "deleting message %s from queue %s",366                standard_message.message["MessageId"],367                self.arn,368            )369            # remove all handles370            for handle in standard_message.receipt_handles:371                del self.receipts[handle]372            standard_message.receipt_handles.clear()373            # remove in-flight message374            try:375                self.inflight.remove(standard_message)376            except KeyError:377                # this means the message was re-queued in the meantime378                # TODO: remove this message from the visible queue if it exists: a message can be removed with an old379                #  receipt handle that was issued before the message was put back in the visible queue.380                self.visible.queue.remove(standard_message)381                heapq.heapify(self.visible.queue)382                pass383    def put(384        self,385        message: Message,386        visibility_timeout: int = None,387        message_deduplication_id: str = None,388        message_group_id: str = None,389        delay_seconds: int = None,390    ):391        raise NotImplementedError392    def get(self, block=True, timeout=None, visibility_timeout: int = None) -> SqsMessage:393        start = time.time()394        while True:395            standard_message: SqsMessage = self.visible.get(block=block, timeout=timeout)396            LOG.debug(397                "de-queued message %s from %s", standard_message.message["MessageId"], self.arn398            )399            with self.mutex:400                if standard_message.deleted:401                    # TODO: check what the behavior of AWS is here. should we return a deleted message?402                    timeout -= time.time() - start403                    if timeout < 0:404                        timeout = 0405                    continue406                # update message attributes407                standard_message.visibility_timeout = (408                    self.visibility_timeout if visibility_timeout is None else visibility_timeout409                )410                standard_message.receive_times += 1411                standard_message.set_last_received(time.time())412                if standard_message.first_received is None:413                    standard_message.first_received = standard_message.last_received414                # create and manage receipt handle415                receipt_handle = self.create_receipt_handle(standard_message)416                standard_message.receipt_handles.add(receipt_handle)417                self.receipts[receipt_handle] = standard_message418                if standard_message.visibility_timeout == 0:419                    self.visible.put_nowait(standard_message)420                else:421                    self.inflight.add(standard_message)422            # prepare message for receiver423            copied_message = copy.deepcopy(standard_message)424            copied_message.message["Attributes"][425                MessageSystemAttributeName.ApproximateReceiveCount...context_helper.py
Source:context_helper.py  
...13    context.user_data.write('last_sent', curr_time)14    expired = curr_time - prev_time > threshold15    logger.info(f'is_session_expired = {expired}')16    return expired17def set_last_received(update, context):18    context.user_data.write(f'last_received {float(time.time() * 1000)}')19def is_possible_spam(update, context):20    t1 = int((datetime.utcnow().timestamp() + 0.5))21    t2 = parser.parse(update.date).timestamp()22    time_diff = t1 - t223    # print(update.message.date)24    # print(update.message.date.timestamp())25    # print(datetime.utcnow())26    # print(datetime.utcnow().timestamp())27    logger.info(f'time_diff = {time_diff}')28    if time_diff > Constants.SESSION_TIMEOUT_SECONDS:29        return True30    min_diff = Constants.MIN_DIFF_IS_SPAM31    prev_time = context.user_data.get('last_received')...subscriber.py
Source:subscriber.py  
...10    def is_last_received(self):11        with self.lock:12            value = self.done13        return value14    def set_last_received(self, value=False):15        with self.lock:16            self.done = value17    def get_id(self):18        return self.sid19    def listen_for_messages(self):20        print("Listening")21        while (not self.is_last_received()) or (self.new_messages.length() > 0):22            message_len = self.new_messages.length()23            if message_len == 0:24                continue25            message = self.new_messages.pop()26            print("%s %s" % (message, self.sid))27        print("Subscriber: %s Finished." % self.sid)28    def receive_message(self, message, last=False):29        self.new_messages.push(message)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
