How to use expire_deleted method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...695 deleted: Dict[str, float]696 def __init__(self):697 self.queues = {}698 self.deleted = {}699 def expire_deleted(self):700 for k in list(self.deleted.keys()):701 if self.deleted[k] <= (time.time() - RECENTLY_DELETED_TIMEOUT):702 del self.deleted[k]703class SqsProvider(SqsApi, ServiceLifecycleHook):704 """705 LocalStack SQS Provider.706 LIMITATIONS:707 - Pagination of results (NextToken)708 - Delivery guarantees709 - The region is not encoded in the queue URL710 """711 queues: Dict[str, SqsQueue]712 def __init__(self) -> None:713 super().__init__()714 self._mutex = threading.RLock()715 self._queue_update_worker = QueueUpdateWorker()716 def on_before_start(self):717 self._queue_update_worker.start()718 def on_before_stop(self):719 self._queue_update_worker.stop()720 def _require_queue(self, context: RequestContext, name: str) -> SqsQueue:721 """722 Returns the queue for the given name, or raises QueueDoesNotExist if it does not exist.723 :param: context: the request context724 :param name: the name to look for725 :returns: the queue726 :raises QueueDoesNotExist: if the queue does not exist727 """728 backend = SqsBackend.get(context.region)729 with self._mutex:730 if name not in backend.queues.keys():731 raise QueueDoesNotExist("The specified queue does not exist for this wsdl version.")732 return backend.queues[name]733 def _require_queue_by_arn(self, context: RequestContext, queue_arn: str) -> SqsQueue:734 arn = parse_arn(queue_arn)735 return self._require_queue(context, arn["resource"])736 def _resolve_queue(737 self,738 context: RequestContext,739 queue_name: Optional[str] = None,740 queue_url: Optional[str] = None,741 ) -> SqsQueue:742 """743 Determines the name of the queue from available information (request context, queue URL) to return the respective queue,744 or raises QueueDoesNotExist if it does not exist.745 :param context: the request context, used for getting region and account_id, and optionally the queue_url746 :param queue_name: the queue name (if this is set, then this will be used for the key)747 :param queue_url: the queue url (if name is not set, this will be used to determine the queue name)748 :returns: the queue749 :raises QueueDoesNotExist: if the queue does not exist750 """751 name = resolve_queue_name(context, queue_name, queue_url)752 return self._require_queue(context, name)753 def create_queue(754 self,755 context: RequestContext,756 queue_name: String,757 attributes: QueueAttributeMap = None,758 tags: TagMap = None,759 ) -> CreateQueueResult:760 fifo = attributes and (761 attributes.get(QueueAttributeName.FifoQueue, "false").lower() == "true"762 )763 # Special Case TODO: why is an emtpy policy passed at all? same in set_queue_attributes764 if attributes and attributes.get(QueueAttributeName.Policy) == "":765 del attributes[QueueAttributeName.Policy]766 backend = SqsBackend.get(context.region)767 with self._mutex:768 if queue_name in backend.queues:769 queue = backend.queues[queue_name]770 if attributes:771 # if attributes are set, then we check whether the existing attributes match the passed ones772 self._validate_queue_attributes(attributes)773 for k, v in attributes.items():774 if queue.attributes.get(k) != v:775 LOG.debug(776 "queue attribute values %s for queue %s do not match %s (existing) != %s (new)",777 k,778 queue_name,779 queue.attributes.get(k),780 v,781 )782 raise QueueNameExists(783 f"A queue already exists with the same name and a different value for attribute {k}"784 )785 return CreateQueueResult(QueueUrl=queue.url(context))786 if config.SQS_DELAY_RECENTLY_DELETED:787 deleted = backend.deleted.get(queue_name)788 if deleted and deleted > (time.time() - RECENTLY_DELETED_TIMEOUT):789 raise QueueDeletedRecently(790 "You must wait 60 seconds after deleting a queue before you can create "791 "another with the same name."792 )793 backend.expire_deleted()794 # create the appropriate queue795 if fifo:796 queue = FifoQueue(queue_name, context.region, context.account_id, attributes, tags)797 else:798 queue = StandardQueue(799 queue_name, context.region, context.account_id, attributes, tags800 )801 LOG.debug("creating queue key=%s attributes=%s tags=%s", queue_name, attributes, tags)802 backend.queues[queue_name] = queue803 return CreateQueueResult(QueueUrl=queue.url(context))804 def get_queue_url(805 self, context: RequestContext, queue_name: String, queue_owner_aws_account_id: String = None806 ) -> GetQueueUrlResult:807 backend = SqsBackend.get(context.region)...

Full Screen

Full Screen

redis_helper.py

Source:redis_helper.py Github

copy

Full Screen

1from app import user_store as u_store2EXPIRE_LOGS = 36003EXPIRE_DELETED = 3600 // 44# time for AWS to delete a pending subscription that hasn't been confirmed5EXPIRE_PENDING = 3600 * 24 * 36SESSION_LENGTH = 327ALL_PENDING_KEY = 'global:pending'8def _e(token: str) -> str:9 return f"endpoint:{token}"10def _dle(token: str) -> str:11 return f"endpoint:{token}:deleted"12def _pe(token: str) -> str:13 return f"endpoint:{token}:pending"14def get_logs_from_endpoint(token: str):15 r_key = _e(token)16 pipe = u_store.pipeline()17 rv, exp = pipe.lrange(r_key, 0, -1).expire(r_key, EXPIRE_LOGS).execute()18 if not rv:19 return None20 return rv21def get_len_logs_from_endpoint(token: str) -> int:22 r_key = _e(token)23 return u_store.llen(r_key)24def add_logs_to_endpoint(token: str, log_data: str):25 r_key = _e(token)26 pipe = u_store.pipeline()27 rv, exp = pipe.rpush(r_key, log_data).expire(r_key, EXPIRE_LOGS).execute()28 if not rv:29 return None30 return rv31def add_deleted_endpoint(token: str):32 r_key = _dle(token)33 return u_store.set(r_key, "1", ex=EXPIRE_DELETED)34def is_deleted_endpoint(token: str):35 r_key = _dle(token)36 return u_store.exists(r_key)37def add_endpoint_as_pending(token: str):38 r_key = _pe(token)39 pipe = u_store.pipeline()40 pipe.set(r_key, "1", ex=EXPIRE_PENDING).sadd(ALL_PENDING_KEY, token)41 return pipe.execute()42def del_endpoint_as_pending(token: str):43 r_key = _pe(token)44 pipe = u_store.pipeline()45 pipe.delete(r_key).srem(ALL_PENDING_KEY, token)46 return pipe.execute()47def get_all_pending():48 pending_list = [pending_endpoint for pending_endpoint in u_store.sscan_iter(ALL_PENDING_KEY)]49 pipe = u_store.pipeline()50 for pending_endpoint in pending_list:51 pipe.get(_pe(pending_endpoint))52 data_resp = pipe.execute()53 pending_endpoints = []54 expired = []55 for index, pending_endpoint_status in enumerate(data_resp):56 if pending_endpoint_status == "1":57 pending_endpoints.append(pending_list[index])58 else:59 expired.append(pending_list[index])60 if expired:61 u_store.srem(ALL_PENDING_KEY, *expired)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful