How to use requeue_inflight_messages method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...314 # TODO: update message attributes (ApproximateFirstReceiveTimestamp, ApproximateReceiveCount)315 copied_message = copy.deepcopy(standard_message)316 copied_message.message["ReceiptHandle"] = receipt_handle317 return copied_message318 def requeue_inflight_messages(self):319 if not self.inflight:320 return321 with self.mutex:322 messages = list(self.inflight)323 for standard_message in messages:324 if standard_message.is_visible:325 LOG.debug(326 "re-queueing inflight messages %s into queue %s",327 standard_message.message["MessageId"],328 self.arn,329 )330 self.inflight.remove(standard_message)331 self.visible.put_nowait(standard_message)332 def _assert_queue_name(self, name):333 if not re.match(r"^[a-zA-Z0-9_-]{1,80}$", name):334 raise InvalidParameterValue(335 "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"336 )337 def validate_queue_attributes(self, attributes):338 valid = [k[1] for k in inspect.getmembers(QueueAttributeName)]339 del valid[valid.index(QueueAttributeName.FifoQueue)]340 for k in attributes.keys():341 if k not in valid:342 raise InvalidAttributeName(f"Unknown attribute name {k}")343 def generate_sequence_number(self):344 return None345class StandardQueue(SqsQueue):346 def put(347 self,348 message: Message,349 visibility_timeout: int = None,350 message_deduplication_id: str = None,351 message_group_id: str = None,352 ):353 if message_deduplication_id:354 raise InvalidParameterValue(355 f"Value {message_deduplication_id} for parameter MessageDeduplicationId is invalid. Reason: The "356 f"request includes a parameter that is not valid for this queue type. "357 )358 if message_group_id:359 raise InvalidParameterValue(360 f"Value {message_group_id} for parameter MessageGroupId is invalid. Reason: The request includes a "361 f"parameter that is not valid for this queue type. "362 )363 standard_message = SqsMessage(time.time(), message)364 if visibility_timeout is not None:365 standard_message.visibility_timeout = visibility_timeout366 else:367 # use the attribute from the queue368 standard_message.visibility_timeout = self.visibility_timeout369 self.visible.put_nowait(standard_message)370class FifoQueue(SqsQueue):371 visible: PriorityQueue372 inflight: Set[SqsMessage]373 receipts: Dict[str, SqsMessage]374 deduplication: Dict[str, Dict[str, SqsMessage]]375 def __init__(self, key: QueueKey, attributes=None, tags=None) -> None:376 super().__init__(key, attributes, tags)377 self.deduplication = {}378 def put(379 self,380 message: Message,381 visibility_timeout: int = None,382 message_deduplication_id: str = None,383 message_group_id: str = None,384 ):385 if not message_group_id:386 raise MissingParameter("The request must contain the parameter MessageGroupId.")387 dedup_id = message_deduplication_id388 content_based_deduplication = (389 "true"390 == (self.attributes.get(QueueAttributeName.ContentBasedDeduplication, "false")).lower()391 )392 if not dedup_id and content_based_deduplication:393 dedup_id = hashlib.sha256(message.get("Body").encode("utf-8")).hexdigest()394 if not dedup_id:395 raise InvalidParameterValue(396 "The Queue should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided "397 "explicitly "398 )399 qm = SqsMessage(400 time.time(),401 message,402 message_deduplication_id=dedup_id,403 message_group_id=message_group_id,404 )405 if visibility_timeout is not None:406 qm.visibility_timeout = visibility_timeout407 else:408 # use the attribute from the queue409 qm.visibility_timeout = self.visibility_timeout410 original_message = None411 original_message_group = self.deduplication.get(message_group_id)412 if original_message_group:413 original_message = original_message_group.get(dedup_id)414 if (415 original_message416 and not original_message.deleted417 and original_message.priority + DEDUPLICATION_INTERVAL_IN_SEC > qm.priority418 ):419 message["MessageId"] = original_message.message["MessageId"]420 else:421 self.visible.put_nowait(qm)422 if not original_message_group:423 self.deduplication[message_group_id] = {}424 self.deduplication[message_group_id][message_deduplication_id] = qm425 def _assert_queue_name(self, name):426 if not name.endswith(".fifo"):427 raise InvalidParameterValue(428 "The name of a FIFO queue can only include alphanumeric characters, hyphens, or underscores, "429 "must end with .fifo suffix and be 1 to 80 in length"430 )431 # The .fifo suffix counts towards the 80-character queue name quota.432 queue_name = name[:-5] + "_fifo"433 super()._assert_queue_name(queue_name)434 def validate_queue_attributes(self, attributes):435 valid = [k[1] for k in inspect.getmembers(QueueAttributeName)]436 for k in attributes.keys():437 if k not in valid:438 raise InvalidAttributeName(f"Unknown attribute name {k}")439 # Special Cases440 fifo = attributes.get(QueueAttributeName.FifoQueue)441 if fifo and fifo.lower() != "true":442 raise InvalidAttributeValue(443 "Invalid value for the parameter FifoQueue. Reason: Modifying queue type is not supported."444 )445 # TODO: If we ever actually need to do something with this number, it needs to be part of446 # SQSMessage. This means changing all *put*() signatures to return the saved message.447 def generate_sequence_number(self):448 return _create_mock_sequence_number()449class InflightUpdateWorker:450 """451 Regularly re-queues inflight messages whose visibility timeout has expired.452 FIXME: very crude implementation. it would be better to have event-driven communication.453 """454 queues: Dict[QueueKey, SqsQueue]455 def __init__(self, queues: Dict[QueueKey, SqsQueue]) -> None:456 super().__init__()457 self.queues = queues458 self.running = False459 self.thread: Optional[FuncThread] = None460 def start(self):461 if self.thread:462 return463 def _run(*_args):464 self.running = True465 self.run()466 self.thread = start_thread(_run)467 def stop(self):468 if self.thread:469 self.thread.stop()470 self.running = False471 self.thread = None472 def run(self):473 while self.running:474 time.sleep(1)475 for queue in self.queues.values():476 queue.requeue_inflight_messages()477def check_attributes(message_attributes: MessageBodyAttributeMap):478 if not message_attributes:479 return480 for attribute_name in message_attributes:481 if len(attribute_name) >= 256:482 raise InvalidParameterValue(483 "Message (user) attribute names must be shorter than 256 Bytes"484 )485 if not re.match(ATTR_NAME_CHAR_REGEX, attribute_name.lower()):486 raise InvalidParameterValue(487 "Message (user) attributes name can only contain upper and lower score characters, digits, periods, "488 "hyphens and underscores. "489 )490 if not re.match(ATTR_NAME_PREFIX_SUFFIX_REGEX, attribute_name.lower()):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful