How to use encode_receipt_handle method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...124def check_message_content(message_body: str):125 error = "Invalid characters found. Valid unicode characters are #x9 | #xA | #xD | #x20 to #xD7FF | #xE000 to #xFFFD | #x10000 to #x10FFFF"126 if not re.match(MSG_CONTENT_REGEX, message_body):127 raise InvalidMessageContents(error)128def encode_receipt_handle(queue_arn, message: "SqsMessage") -> str:129 # http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/ImportantIdentifiers.html#ImportantIdentifiers-receipt-handles130 # encode the queue arn in the receipt handle, so we can later check if it belongs to the queue131 # but also add some randomness s.t. the generated receipt handles look like the ones from AWS132 handle = f"{long_uid()} {queue_arn} {message.message.get('MessageId')} {message.last_received}"133 encoded = base64.b64encode(handle.encode("utf-8"))134 return encoded.decode("utf-8")135def decode_receipt_handle(receipt_handle: str) -> str:136 try:137 handle = base64.b64decode(receipt_handle).decode("utf-8")138 _, queue_arn, message_id, last_received = handle.split(" ")139 parse_arn(queue_arn) # raises a ValueError if it is not an arn140 return queue_arn141 except (IndexError, ValueError):142 raise ReceiptHandleIsInvalid(143 f'The input receipt handle "{receipt_handle}" is not a valid receipt handle.'144 )145class Permission(NamedTuple):146 # TODO: just a placeholder for real policies147 label: str148 account_id: str149 action: str150class SqsMessage:151 message: Message152 created: float153 visibility_timeout: int154 receive_times: int155 delay_seconds: Optional[int]156 receipt_handles: Set[str]157 last_received: Optional[float]158 first_received: Optional[float]159 visibility_deadline: Optional[float]160 deleted: bool161 priority: float162 message_deduplication_id: str163 message_group_id: str164 def __init__(165 self,166 priority: float,167 message: Message,168 message_deduplication_id: str = None,169 message_group_id: str = None,170 ) -> None:171 self.created = time.time()172 self.message = message173 self.receive_times = 0174 self.receipt_handles = set()175 self.delay_seconds = None176 self.last_received = None177 self.first_received = None178 self.deleted = False179 self.priority = priority180 attributes = {}181 if message_group_id is not None:182 attributes["MessageGroupId"] = message_group_id183 if message_deduplication_id is not None:184 attributes["MessageDeduplicationId"] = message_deduplication_id185 if self.message.get("Attributes"):186 self.message["Attributes"].update(attributes)187 else:188 self.message["Attributes"] = attributes189 @property190 def message_group_id(self) -> Optional[str]:191 return self.message["Attributes"].get("MessageGroupId")192 @property193 def message_deduplication_id(self) -> Optional[str]:194 return self.message["Attributes"].get("MessageDeduplicationId")195 def set_last_received(self, timestamp: float):196 """197 Sets the last received timestamp of the message to the given value, and updates the visibility deadline198 accordingly.199 :param timestamp: the last time the message was received200 """201 self.last_received = timestamp202 self.visibility_deadline = timestamp + self.visibility_timeout203 def update_visibility_timeout(self, timeout: int):204 """205 Sets the visibility timeout of the message to the given value, and updates the visibility deadline accordingly.206 :param timeout: the timeout value in seconds207 """208 self.visibility_timeout = timeout209 self.visibility_deadline = time.time() + timeout210 @property211 def is_visible(self) -> bool:212 """213 Returns false if the message has a visibility deadline that is in the future.214 :return: whether the message is visibile or not.215 """216 if self.visibility_deadline is None:217 return True218 if time.time() >= self.visibility_deadline:219 return True220 return False221 @property222 def is_delayed(self) -> bool:223 if self.delay_seconds is None:224 return False225 return time.time() <= self.created + self.delay_seconds226 def __gt__(self, other):227 return self.priority > other.priority228 def __ge__(self, other):229 return self.priority >= other.priority230 def __lt__(self, other):231 return self.priority < other.priority232 def __le__(self, other):233 return self.priority <= other.priority234 def __eq__(self, other):235 return self.message["MessageId"] == other.message["MessageId"]236 def __hash__(self):237 return self.message["MessageId"].__hash__()238class SqsQueue:239 name: str240 region: str241 account_id: str242 attributes: QueueAttributeMap243 tags: TagMap244 permissions: Set[Permission]245 purge_in_progress: bool246 visible: PriorityQueue247 delayed: Set[SqsMessage]248 inflight: Set[SqsMessage]249 receipts: Dict[str, SqsMessage]250 def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:251 self.name = name252 self.region = region253 self.account_id = account_id254 self._assert_queue_name(name)255 self.tags = tags or {}256 self.visible = PriorityQueue()257 self.delayed = set()258 self.inflight = set()259 self.receipts = {}260 self.attributes = self.default_attributes()261 if attributes:262 self.attributes.update(attributes)263 self.purge_in_progress = False264 self.permissions = set()265 self.mutex = threading.RLock()266 def default_attributes(self) -> QueueAttributeMap:267 return {268 QueueAttributeName.ApproximateNumberOfMessages: self.visible._qsize,269 QueueAttributeName.ApproximateNumberOfMessagesNotVisible: lambda: len(self.inflight),270 QueueAttributeName.ApproximateNumberOfMessagesDelayed: lambda: len(self.delayed),271 QueueAttributeName.CreatedTimestamp: str(now()),272 QueueAttributeName.DelaySeconds: "0",273 QueueAttributeName.LastModifiedTimestamp: str(now()),274 QueueAttributeName.MaximumMessageSize: "262144",275 QueueAttributeName.MessageRetentionPeriod: "345600",276 QueueAttributeName.QueueArn: self.arn,277 QueueAttributeName.ReceiveMessageWaitTimeSeconds: "0",278 QueueAttributeName.VisibilityTimeout: "30",279 QueueAttributeName.SqsManagedSseEnabled: "false",280 }281 def update_delay_seconds(self, value: int):282 """283 For standard queues, the per-queue delay setting is not retroactive—changing the setting doesn't affect the delay of messages already in the queue.284 For FIFO queues, the per-queue delay setting is retroactive—changing the setting affects the delay of messages already in the queue.285 https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-delay-queues.html286 :param value: the number of seconds287 """288 self.attributes[QueueAttributeName.DelaySeconds] = str(value)289 def update_last_modified(self, timestamp: int = None):290 if timestamp is None:291 timestamp = now()292 self.attributes[QueueAttributeName.LastModifiedTimestamp] = str(timestamp)293 @property294 def arn(self) -> str:295 return f"arn:aws:sqs:{self.region}:{self.account_id}:{self.name}"296 def url(self, context: RequestContext) -> str:297 """Return queue URL using either SQS_PORT_EXTERNAL (if configured), the SQS_ENDPOINT_STRATEGY (if configured)298 or based on the 'Host' request header"""299 host_url = context.request.host_url300 if config.SQS_ENDPOINT_STRATEGY == "domain":301 # queue.localhost.localstack.cloud:4566/000000000000/my-queue (us-east-1)302 # or us-east-2.queue.localhost.localstack.cloud:4566/000000000000/my-queue303 region = "" if self.region == "us-east-1" else self.region + "."304 scheme = context.request.scheme305 host_url = f"{scheme}://{region}queue.{constants.LOCALHOST_HOSTNAME}:{config.EDGE_PORT}"306 elif config.SQS_ENDPOINT_STRATEGY == "path":307 # https?://localhost:4566/queue/us-east-1/00000000000/my-queue (us-east-1)308 host_url = f"{context.request.host_url}/queue/{self.region}"309 else:310 if config.SQS_PORT_EXTERNAL:311 host_url = external_service_url("sqs")312 return "{host}/{account_id}/{name}".format(313 host=host_url.rstrip("/"),314 account_id=self.account_id,315 name=self.name,316 )317 @property318 def visibility_timeout(self) -> int:319 return int(self.attributes[QueueAttributeName.VisibilityTimeout])320 @property321 def delay_seconds(self) -> int:322 return int(self.attributes[QueueAttributeName.DelaySeconds])323 @property324 def wait_time_seconds(self) -> int:325 return int(self.attributes[QueueAttributeName.ReceiveMessageWaitTimeSeconds])326 def validate_receipt_handle(self, receipt_handle: str):327 if self.arn != decode_receipt_handle(receipt_handle):328 raise ReceiptHandleIsInvalid(329 f'The input receipt handle "{receipt_handle}" is not a valid receipt handle.'330 )331 def update_visibility_timeout(self, receipt_handle: str, visibility_timeout: int):332 with self.mutex:333 self.validate_receipt_handle(receipt_handle)334 if receipt_handle not in self.receipts:335 raise InvalidParameterValue(336 f"Value {receipt_handle} for parameter ReceiptHandle is invalid. Reason: Message does not exist "337 f"or is not available for visibility timeout change."338 )339 standard_message = self.receipts[receipt_handle]340 if standard_message not in self.inflight:341 raise MessageNotInflight()342 standard_message.update_visibility_timeout(visibility_timeout)343 if visibility_timeout == 0:344 LOG.info(345 "terminating the visibility timeout of %s",346 standard_message.message["MessageId"],347 )348 # Terminating the visibility timeout for a message349 # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html#terminating-message-visibility-timeout350 self.inflight.remove(standard_message)351 self.visible.put_nowait(standard_message)352 def remove(self, receipt_handle: str):353 with self.mutex:354 self.validate_receipt_handle(receipt_handle)355 if receipt_handle not in self.receipts:356 LOG.debug(357 "no in-flight message found for receipt handle %s in queue %s",358 receipt_handle,359 self.arn,360 )361 return362 standard_message = self.receipts[receipt_handle]363 standard_message.deleted = True364 LOG.debug(365 "deleting message %s from queue %s",366 standard_message.message["MessageId"],367 self.arn,368 )369 # remove all handles370 for handle in standard_message.receipt_handles:371 del self.receipts[handle]372 standard_message.receipt_handles.clear()373 # remove in-flight message374 try:375 self.inflight.remove(standard_message)376 except KeyError:377 # this means the message was re-queued in the meantime378 # TODO: remove this message from the visible queue if it exists: a message can be removed with an old379 # receipt handle that was issued before the message was put back in the visible queue.380 self.visible.queue.remove(standard_message)381 heapq.heapify(self.visible.queue)382 pass383 def put(384 self,385 message: Message,386 visibility_timeout: int = None,387 message_deduplication_id: str = None,388 message_group_id: str = None,389 delay_seconds: int = None,390 ):391 raise NotImplementedError392 def get(self, block=True, timeout=None, visibility_timeout: int = None) -> SqsMessage:393 start = time.time()394 while True:395 standard_message: SqsMessage = self.visible.get(block=block, timeout=timeout)396 LOG.debug(397 "de-queued message %s from %s", standard_message.message["MessageId"], self.arn398 )399 with self.mutex:400 if standard_message.deleted:401 # TODO: check what the behavior of AWS is here. should we return a deleted message?402 timeout -= time.time() - start403 if timeout < 0:404 timeout = 0405 continue406 # update message attributes407 standard_message.visibility_timeout = (408 self.visibility_timeout if visibility_timeout is None else visibility_timeout409 )410 standard_message.receive_times += 1411 standard_message.set_last_received(time.time())412 if standard_message.first_received is None:413 standard_message.first_received = standard_message.last_received414 # create and manage receipt handle415 receipt_handle = self.create_receipt_handle(standard_message)416 standard_message.receipt_handles.add(receipt_handle)417 self.receipts[receipt_handle] = standard_message418 if standard_message.visibility_timeout == 0:419 self.visible.put_nowait(standard_message)420 else:421 self.inflight.add(standard_message)422 # prepare message for receiver423 copied_message = copy.deepcopy(standard_message)424 copied_message.message["Attributes"][425 MessageSystemAttributeName.ApproximateReceiveCount426 ] = str(standard_message.receive_times)427 copied_message.message["Attributes"][428 MessageSystemAttributeName.ApproximateFirstReceiveTimestamp429 ] = str(int(standard_message.first_received * 1000))430 copied_message.message["ReceiptHandle"] = receipt_handle431 return copied_message432 def create_receipt_handle(self, message: SqsMessage) -> str:433 return encode_receipt_handle(self.arn, message)434 def requeue_inflight_messages(self):435 if not self.inflight:436 return437 with self.mutex:438 messages = [message for message in self.inflight if message.is_visible]439 for standard_message in messages:440 LOG.debug(441 "re-queueing inflight messages %s into queue %s",442 standard_message.message["MessageId"],443 self.arn,444 )445 self.inflight.remove(standard_message)446 self.visible.put_nowait(standard_message)447 def enqueue_delayed_messages(self):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful