Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...695    deleted: Dict[str, float]696    def __init__(self):697        self.queues = {}698        self.deleted = {}699    def expire_deleted(self):700        for k in list(self.deleted.keys()):701            if self.deleted[k] <= (time.time() - RECENTLY_DELETED_TIMEOUT):702                del self.deleted[k]703class SqsProvider(SqsApi, ServiceLifecycleHook):704    """705    LocalStack SQS Provider.706    LIMITATIONS:707        - Pagination of results (NextToken)708        - Delivery guarantees709        - The region is not encoded in the queue URL710    """711    queues: Dict[str, SqsQueue]712    def __init__(self) -> None:713        super().__init__()714        self._mutex = threading.RLock()715        self._queue_update_worker = QueueUpdateWorker()716    def on_before_start(self):717        self._queue_update_worker.start()718    def on_before_stop(self):719        self._queue_update_worker.stop()720    def _require_queue(self, context: RequestContext, name: str) -> SqsQueue:721        """722        Returns the queue for the given name, or raises QueueDoesNotExist if it does not exist.723        :param: context: the request context724        :param name: the name to look for725        :returns: the queue726        :raises QueueDoesNotExist: if the queue does not exist727        """728        backend = SqsBackend.get(context.region)729        with self._mutex:730            if name not in backend.queues.keys():731                raise QueueDoesNotExist("The specified queue does not exist for this wsdl version.")732            return backend.queues[name]733    def _require_queue_by_arn(self, context: RequestContext, queue_arn: str) -> SqsQueue:734        arn = parse_arn(queue_arn)735        return self._require_queue(context, arn["resource"])736    def _resolve_queue(737        self,738        context: RequestContext,739        queue_name: Optional[str] = None,740        queue_url: Optional[str] = None,741    ) -> SqsQueue:742        """743        Determines the name of the queue from available information (request context, queue URL) to return the respective queue,744        or raises QueueDoesNotExist if it does not exist.745        :param context: the request context, used for getting region and account_id, and optionally the queue_url746        :param queue_name: the queue name (if this is set, then this will be used for the key)747        :param queue_url: the queue url (if name is not set, this will be used to determine the queue name)748        :returns: the queue749        :raises QueueDoesNotExist: if the queue does not exist750        """751        name = resolve_queue_name(context, queue_name, queue_url)752        return self._require_queue(context, name)753    def create_queue(754        self,755        context: RequestContext,756        queue_name: String,757        attributes: QueueAttributeMap = None,758        tags: TagMap = None,759    ) -> CreateQueueResult:760        fifo = attributes and (761            attributes.get(QueueAttributeName.FifoQueue, "false").lower() == "true"762        )763        # Special Case TODO: why is an emtpy policy passed at all? same in set_queue_attributes764        if attributes and attributes.get(QueueAttributeName.Policy) == "":765            del attributes[QueueAttributeName.Policy]766        backend = SqsBackend.get(context.region)767        with self._mutex:768            if queue_name in backend.queues:769                queue = backend.queues[queue_name]770                if attributes:771                    # if attributes are set, then we check whether the existing attributes match the passed ones772                    self._validate_queue_attributes(attributes)773                    for k, v in attributes.items():774                        if queue.attributes.get(k) != v:775                            LOG.debug(776                                "queue attribute values %s for queue %s do not match %s (existing) != %s (new)",777                                k,778                                queue_name,779                                queue.attributes.get(k),780                                v,781                            )782                            raise QueueNameExists(783                                f"A queue already exists with the same name and a different value for attribute {k}"784                            )785                return CreateQueueResult(QueueUrl=queue.url(context))786            if config.SQS_DELAY_RECENTLY_DELETED:787                deleted = backend.deleted.get(queue_name)788                if deleted and deleted > (time.time() - RECENTLY_DELETED_TIMEOUT):789                    raise QueueDeletedRecently(790                        "You must wait 60 seconds after deleting a queue before you can create "791                        "another with the same name."792                    )793            backend.expire_deleted()794            # create the appropriate queue795            if fifo:796                queue = FifoQueue(queue_name, context.region, context.account_id, attributes, tags)797            else:798                queue = StandardQueue(799                    queue_name, context.region, context.account_id, attributes, tags800                )801            LOG.debug("creating queue key=%s attributes=%s tags=%s", queue_name, attributes, tags)802            backend.queues[queue_name] = queue803        return CreateQueueResult(QueueUrl=queue.url(context))804    def get_queue_url(805        self, context: RequestContext, queue_name: String, queue_owner_aws_account_id: String = None806    ) -> GetQueueUrlResult:807        backend = SqsBackend.get(context.region)...redis_helper.py
Source:redis_helper.py  
1from app import user_store as u_store2EXPIRE_LOGS = 36003EXPIRE_DELETED = 3600 // 44# time for AWS to delete a pending subscription that hasn't been confirmed5EXPIRE_PENDING = 3600 * 24 * 36SESSION_LENGTH = 327ALL_PENDING_KEY = 'global:pending'8def _e(token: str) -> str:9    return f"endpoint:{token}"10def _dle(token: str) -> str:11    return f"endpoint:{token}:deleted"12def _pe(token: str) -> str:13    return f"endpoint:{token}:pending"14def get_logs_from_endpoint(token: str):15    r_key = _e(token)16    pipe = u_store.pipeline()17    rv, exp = pipe.lrange(r_key, 0, -1).expire(r_key, EXPIRE_LOGS).execute()18    if not rv:19        return None20    return rv21def get_len_logs_from_endpoint(token: str) -> int:22    r_key = _e(token)23    return u_store.llen(r_key)24def add_logs_to_endpoint(token: str, log_data: str):25    r_key = _e(token)26    pipe = u_store.pipeline()27    rv, exp = pipe.rpush(r_key, log_data).expire(r_key, EXPIRE_LOGS).execute()28    if not rv:29        return None30    return rv31def add_deleted_endpoint(token: str):32    r_key = _dle(token)33    return u_store.set(r_key, "1", ex=EXPIRE_DELETED)34def is_deleted_endpoint(token: str):35    r_key = _dle(token)36    return u_store.exists(r_key)37def add_endpoint_as_pending(token: str):38    r_key = _pe(token)39    pipe = u_store.pipeline()40    pipe.set(r_key, "1", ex=EXPIRE_PENDING).sadd(ALL_PENDING_KEY, token)41    return pipe.execute()42def del_endpoint_as_pending(token: str):43    r_key = _pe(token)44    pipe = u_store.pipeline()45    pipe.delete(r_key).srem(ALL_PENDING_KEY, token)46    return pipe.execute()47def get_all_pending():48    pending_list = [pending_endpoint for pending_endpoint in u_store.sscan_iter(ALL_PENDING_KEY)]49    pipe = u_store.pipeline()50    for pending_endpoint in pending_list:51        pipe.get(_pe(pending_endpoint))52    data_resp = pipe.execute()53    pending_endpoints = []54    expired = []55    for index, pending_endpoint_status in enumerate(data_resp):56        if pending_endpoint_status == "1":57            pending_endpoints.append(pending_list[index])58        else:59            expired.append(pending_list[index])60    if expired:61        u_store.srem(ALL_PENDING_KEY, *expired)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
