How to use enqueue_delayed_messages method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...443 self.arn,444 )445 self.inflight.remove(standard_message)446 self.visible.put_nowait(standard_message)447 def enqueue_delayed_messages(self):448 if not self.delayed:449 return450 with self.mutex:451 messages = [message for message in self.delayed if not message.is_delayed]452 for standard_message in messages:453 LOG.debug(454 "enqueueing delayed messages %s into queue %s",455 standard_message.message["MessageId"],456 self.arn,457 )458 self.delayed.remove(standard_message)459 self.visible.put_nowait(standard_message)460 def _assert_queue_name(self, name):461 if not re.match(r"^[a-zA-Z0-9_-]{1,80}$", name):462 raise InvalidParameterValue(463 "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"464 )465 def validate_queue_attributes(self, attributes):466 valid = [k[1] for k in inspect.getmembers(QueueAttributeName)]467 del valid[valid.index(QueueAttributeName.FifoQueue)]468 for k in attributes.keys():469 if k not in valid:470 raise InvalidAttributeName(f"Unknown Attribute {k}.")471 def generate_sequence_number(self):472 return None473class StandardQueue(SqsQueue):474 def put(475 self,476 message: Message,477 visibility_timeout: int = None,478 message_deduplication_id: str = None,479 message_group_id: str = None,480 delay_seconds: int = None,481 ):482 if message_deduplication_id:483 raise InvalidParameterValue(484 f"Value {message_deduplication_id} for parameter MessageDeduplicationId is invalid. Reason: The "485 f"request includes a parameter that is not valid for this queue type. "486 )487 if message_group_id:488 raise InvalidParameterValue(489 f"Value {message_group_id} for parameter MessageGroupId is invalid. Reason: The request includes a "490 f"parameter that is not valid for this queue type. "491 )492 standard_message = SqsMessage(time.time(), message)493 if visibility_timeout is not None:494 standard_message.visibility_timeout = visibility_timeout495 else:496 # use the attribute from the queue497 standard_message.visibility_timeout = self.visibility_timeout498 if delay_seconds is not None:499 standard_message.delay_seconds = delay_seconds500 else:501 standard_message.delay_seconds = self.delay_seconds502 if standard_message.is_delayed:503 self.delayed.add(standard_message)504 else:505 self.visible.put_nowait(standard_message)506class FifoQueue(SqsQueue):507 deduplication: Dict[str, Dict[str, SqsMessage]]508 def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:509 super().__init__(name, region, account_id, attributes, tags)510 self.deduplication = {}511 def update_delay_seconds(self, value: int):512 super(FifoQueue, self).update_delay_seconds(value)513 for message in self.delayed:514 message.delay_seconds = value515 def put(516 self,517 message: Message,518 visibility_timeout: int = None,519 message_deduplication_id: str = None,520 message_group_id: str = None,521 delay_seconds: int = None,522 ):523 if delay_seconds is not None:524 # in fifo queues, delay is only applied on queue level525 raise InvalidParameterValue(526 f"Value {delay_seconds} for parameter DelaySeconds is invalid. Reason: The request include parameter "527 f"that is not valid for this queue type."528 )529 if not message_group_id:530 raise MissingParameter("The request must contain the parameter MessageGroupId.")531 dedup_id = message_deduplication_id532 content_based_deduplication = (533 "true"534 == (self.attributes.get(QueueAttributeName.ContentBasedDeduplication, "false")).lower()535 )536 if not dedup_id and content_based_deduplication:537 dedup_id = hashlib.sha256(message.get("Body").encode("utf-8")).hexdigest()538 if not dedup_id:539 raise InvalidParameterValue(540 "The Queue should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided "541 "explicitly "542 )543 fifo_message = SqsMessage(544 time.time(),545 message,546 message_deduplication_id=dedup_id,547 message_group_id=message_group_id,548 )549 if visibility_timeout is not None:550 fifo_message.visibility_timeout = visibility_timeout551 else:552 # use the attribute from the queue553 fifo_message.visibility_timeout = self.visibility_timeout554 if delay_seconds is not None:555 fifo_message.delay_seconds = delay_seconds556 else:557 fifo_message.delay_seconds = self.delay_seconds558 original_message = None559 original_message_group = self.deduplication.get(message_group_id)560 if original_message_group:561 original_message = original_message_group.get(dedup_id)562 if (563 original_message564 and not original_message.deleted565 and original_message.priority + DEDUPLICATION_INTERVAL_IN_SEC > fifo_message.priority566 ):567 message["MessageId"] = original_message.message["MessageId"]568 else:569 if fifo_message.is_delayed:570 self.delayed.add(fifo_message)571 else:572 self.visible.put_nowait(fifo_message)573 if not original_message_group:574 self.deduplication[message_group_id] = {}575 self.deduplication[message_group_id][dedup_id] = fifo_message576 def _assert_queue_name(self, name):577 if not name.endswith(".fifo"):578 raise InvalidParameterValue(579 "The name of a FIFO queue can only include alphanumeric characters, hyphens, or underscores, "580 "must end with .fifo suffix and be 1 to 80 in length"581 )582 # The .fifo suffix counts towards the 80-character queue name quota.583 queue_name = name[:-5] + "_fifo"584 super()._assert_queue_name(queue_name)585 def validate_queue_attributes(self, attributes):586 valid = [k[1] for k in inspect.getmembers(QueueAttributeName)]587 for k in attributes.keys():588 if k not in valid:589 raise InvalidAttributeName(f"Unknown Attribute {k}.")590 # Special Cases591 fifo = attributes.get(QueueAttributeName.FifoQueue)592 if fifo and fifo.lower() != "true":593 raise InvalidAttributeValue(594 "Invalid value for the parameter FifoQueue. Reason: Modifying queue type is not supported."595 )596 # TODO: If we ever actually need to do something with this number, it needs to be part of597 # SQSMessage. This means changing all *put*() signatures to return the saved message.598 def generate_sequence_number(self):599 return _create_mock_sequence_number()600class QueueUpdateWorker:601 """602 Regularly re-queues inflight and delayed messages whose visibility timeout has expired or delay deadline has been603 reached.604 """605 def __init__(self) -> None:606 super().__init__()607 self.scheduler = Scheduler()608 self.thread: Optional[FuncThread] = None609 self.mutex = threading.RLock()610 def do_update_all_queues(self):611 for region in SqsBackend.regions().keys():612 backend = SqsBackend.get(region)613 for queue in backend.queues.values():614 try:615 queue.requeue_inflight_messages()616 except Exception:617 LOG.exception("error re-queueing inflight messages")618 try:619 queue.enqueue_delayed_messages()620 except Exception:621 LOG.exception("error enqueueing delayed messages")622 def start(self):623 with self.mutex:624 if self.thread:625 return626 self.scheduler = Scheduler()627 self.scheduler.schedule(self.do_update_all_queues, period=1)628 def _run(*_args):629 self.scheduler.run()630 self.thread = start_thread(_run)631 def stop(self):632 with self.mutex:633 if self.scheduler:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful