Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...853            if (854                queue.attributes855                and queue.attributes.get(QueueAttributeName.RedrivePolicy) is not None856            ):857                moved_to_dlq = self._dead_letter_check(queue, standard_message, context)858            if moved_to_dlq:859                continue860            # filter attributes861            if message_attribute_names:862                if "All" not in message_attribute_names:863                    msg["MessageAttributes"] = {864                        k: v865                        for k, v in msg["MessageAttributes"].items()866                        if k in message_attribute_names867                    }868                # TODO: why is this called even if we receive "All" attributes?869                msg["MD5OfMessageAttributes"] = _create_message_attribute_hash(870                    msg["MessageAttributes"]871                )872            else:873                del msg["MessageAttributes"]874            # add message to result875            messages.append(msg)876            num -= 1877        # TODO: how does receiving behave if the queue was deleted in the meantime?878        return ReceiveMessageResult(Messages=messages)879    def _dead_letter_check(880        self, queue: SqsQueue, std_m: SqsMessage, context: RequestContext881    ) -> bool:882        redrive_policy = json.loads(queue.attributes.get(QueueAttributeName.RedrivePolicy))883        # TODO: include the names of the dictionary sub - attributes in the autogenerated code?884        max_receive_count = redrive_policy["maxReceiveCount"]885        if std_m.receive_times > max_receive_count:886            dead_letter_target_arn = redrive_policy["deadLetterTargetArn"]887            dl_queue = self._require_queue_by_arn(dead_letter_target_arn)888            # TODO: this needs to be atomic?889            dead_message = std_m.message890            dl_queue.put(message=dead_message)891            queue.remove(std_m.message["ReceiptHandle"])892            return True893        else:...test_sqs.py
Source:test_sqs.py  
...28    md5_listener = get_message_attributes_md5(msg_attrs_listener)29    msg_attrs_provider = {"timestamp": {"StringValue": "1493147359900", "DataType": "Number"}}30    md5_provider = provider._create_message_attribute_hash(msg_attrs_provider)31    assert md5_provider == md5_listener32def test_handle_string_max_receive_count_in_dead_letter_check():33    # fmt: off34    policy = {"RedrivePolicy": "{\"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:000000000000:DeadLetterQueue\",\"maxReceiveCount\": \"5\" }"}35    # fmt: on36    queue = provider.SqsQueue("TestQueue", "us-east-1", "123456789", policy)37    sqs_message = provider.SqsMessage(Message(), {})38    result = provider.SqsProvider()._dead_letter_check(queue, sqs_message, None)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
