Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...988        self._assert_batch(entries)989        # check the total batch size first and raise BatchRequestTooLong id > DEFAULT_MAXIMUM_MESSAGE_SIZE.990        # This is checked before any messages in the batch are sent.  Raising the exception here should991        # cause error response, rather than batching error results and returning992        self._assert_valid_batch_size(entries, DEFAULT_MAXIMUM_MESSAGE_SIZE)993        successful = []994        failed = []995        with queue.mutex:996            for entry in entries:997                try:998                    queue_item = self._put_message(999                        queue,1000                        context,1001                        message_body=entry.get("MessageBody"),1002                        delay_seconds=entry.get("DelaySeconds"),1003                        message_attributes=entry.get("MessageAttributes"),1004                        message_system_attributes=entry.get("MessageSystemAttributes"),1005                        message_deduplication_id=entry.get("MessageDeduplicationId"),1006                        message_group_id=entry.get("MessageGroupId"),1007                    )1008                    message = queue_item.message1009                    successful.append(1010                        SendMessageBatchResultEntry(1011                            Id=entry["Id"],1012                            MessageId=message.get("MessageId"),1013                            MD5OfMessageBody=message.get("MD5OfBody"),1014                            MD5OfMessageAttributes=message.get("MD5OfMessageAttributes"),1015                            MD5OfMessageSystemAttributes=_create_message_attribute_hash(1016                                message.get("message_system_attributes")1017                            ),1018                            SequenceNumber=queue_item.sequence_number,1019                        )1020                    )1021                except Exception as e:1022                    failed.append(1023                        BatchResultErrorEntry(1024                            Id=entry["Id"],1025                            SenderFault=False,1026                            Code=e.__class__.__name__,1027                            Message=str(e),1028                        )1029                    )1030        return SendMessageBatchResult(1031            Successful=successful,1032            Failed=failed,1033        )1034    def _put_message(1035        self,1036        queue: SqsQueue,1037        context: RequestContext,1038        message_body: String,1039        delay_seconds: Integer = None,1040        message_attributes: MessageBodyAttributeMap = None,1041        message_system_attributes: MessageBodySystemAttributeMap = None,1042        message_deduplication_id: String = None,1043        message_group_id: String = None,1044    ) -> SqsMessage:1045        check_message_content(message_body)1046        check_attributes(message_attributes)1047        check_attributes(message_system_attributes)1048        check_fifo_id(message_deduplication_id)1049        check_fifo_id(message_group_id)1050        message = Message(1051            MessageId=generate_message_id(),1052            MD5OfBody=md5(message_body),1053            Body=message_body,1054            Attributes=self._create_message_attributes(context, message_system_attributes),1055            MD5OfMessageAttributes=_create_message_attribute_hash(message_attributes),1056            MessageAttributes=message_attributes,1057        )1058        return queue.put(1059            message=message,1060            message_deduplication_id=message_deduplication_id,1061            message_group_id=message_group_id,1062            delay_seconds=int(delay_seconds) if delay_seconds is not None else None,1063        )1064    def receive_message(1065        self,1066        context: RequestContext,1067        queue_url: String,1068        attribute_names: AttributeNameList = None,1069        message_attribute_names: MessageAttributeNameList = None,1070        max_number_of_messages: Integer = None,1071        visibility_timeout: Integer = None,1072        wait_time_seconds: Integer = None,1073        receive_request_attempt_id: String = None,1074    ) -> ReceiveMessageResult:1075        queue = self._resolve_queue(context, queue_url=queue_url)1076        if wait_time_seconds is None:1077            wait_time_seconds = queue.wait_time_seconds1078        num = max_number_of_messages or 11079        block = True if wait_time_seconds else False1080        # collect messages1081        messages = []1082        # we chose to always return the maximum possible number of messages, even though AWS will typically return1083        # fewer messages than requested on small queues. at some point we could maybe change this to randomly sample1084        # between 1 and max_number_of_messages.1085        # see https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html1086        while num:1087            try:1088                standard_message = queue.get(1089                    block=block, timeout=wait_time_seconds, visibility_timeout=visibility_timeout1090                )1091                msg = standard_message.message1092            except Empty:1093                break1094            # setting block to false guarantees that, if we've already waited before, we don't wait the full time1095            # again in the next iteration if max_number_of_messages is set but there are no more messages in the1096            # queue. see https://github.com/localstack/localstack/issues/58241097            block = False1098            moved_to_dlq = False1099            if (1100                queue.attributes1101                and queue.attributes.get(QueueAttributeName.RedrivePolicy) is not None1102            ):1103                moved_to_dlq = self._dead_letter_check(queue, standard_message, context)1104            if moved_to_dlq:1105                continue1106            msg = copy.deepcopy(msg)1107            message_filter_attributes(msg, attribute_names)1108            message_filter_message_attributes(msg, message_attribute_names)1109            if msg.get("MessageAttributes"):1110                msg["MD5OfMessageAttributes"] = _create_message_attribute_hash(1111                    msg["MessageAttributes"]1112                )1113            else:1114                # delete the value that was computed when creating the message1115                msg.pop("MD5OfMessageAttributes")1116            # add message to result1117            messages.append(msg)1118            num -= 11119        # TODO: how does receiving behave if the queue was deleted in the meantime?1120        return ReceiveMessageResult(Messages=messages)1121    def _dead_letter_check(1122        self, queue: SqsQueue, std_m: SqsMessage, context: RequestContext1123    ) -> bool:1124        redrive_policy = json.loads(queue.attributes.get(QueueAttributeName.RedrivePolicy))1125        # TODO: include the names of the dictionary sub - attributes in the autogenerated code?1126        max_receive_count = int(redrive_policy["maxReceiveCount"])1127        if std_m.receive_times > max_receive_count:1128            dead_letter_target_arn = redrive_policy["deadLetterTargetArn"]1129            dl_queue = self._require_queue_by_arn(context, dead_letter_target_arn)1130            # TODO: this needs to be atomic?1131            dead_message = std_m.message1132            dl_queue.put(1133                message=dead_message,1134                message_deduplication_id=std_m.message_deduplication_id,1135                message_group_id=std_m.message_group_id,1136            )1137            queue.remove(std_m.message["ReceiptHandle"])1138            return True1139        else:1140            return False1141    def list_dead_letter_source_queues(1142        self,1143        context: RequestContext,1144        queue_url: String,1145        next_token: Token = None,1146        max_results: BoxedInteger = None,1147    ) -> ListDeadLetterSourceQueuesResult:1148        urls = []1149        backend = SqsBackend.get(context.region)1150        dead_letter_queue = self._resolve_queue(context, queue_url=queue_url)1151        for queue in backend.queues.values():1152            policy = queue.attributes.get(QueueAttributeName.RedrivePolicy)1153            if policy:1154                policy = json.loads(policy)1155                dlq_arn = policy.get("deadLetterTargetArn")1156                if dlq_arn == dead_letter_queue.arn:1157                    urls.append(queue.url(context))1158        return ListDeadLetterSourceQueuesResult(queueUrls=urls)1159    def delete_message(1160        self, context: RequestContext, queue_url: String, receipt_handle: String1161    ) -> None:1162        queue = self._resolve_queue(context, queue_url=queue_url)1163        queue.remove(receipt_handle)1164    def delete_message_batch(1165        self,1166        context: RequestContext,1167        queue_url: String,1168        entries: DeleteMessageBatchRequestEntryList,1169    ) -> DeleteMessageBatchResult:1170        queue = self._resolve_queue(context, queue_url=queue_url)1171        self._assert_batch(entries)1172        successful = []1173        failed = []1174        with queue.mutex:1175            for entry in entries:1176                try:1177                    queue.remove(entry["ReceiptHandle"])1178                    successful.append(DeleteMessageBatchResultEntry(Id=entry["Id"]))1179                except Exception as e:1180                    failed.append(1181                        BatchResultErrorEntry(1182                            Id=entry["Id"],1183                            SenderFault=False,1184                            Code=e.__class__.__name__,1185                            Message=str(e),1186                        )1187                    )1188        return DeleteMessageBatchResult(1189            Successful=successful,1190            Failed=failed,1191        )1192    def purge_queue(self, context: RequestContext, queue_url: String) -> None:1193        queue = self._resolve_queue(context, queue_url=queue_url)1194        with queue.mutex:1195            if config.SQS_DELAY_PURGE_RETRY:1196                if queue.purge_timestamp and (queue.purge_timestamp + 60) > time.time():1197                    raise PurgeQueueInProgress(1198                        f"Only one PurgeQueue operation on {queue.name} is allowed every 60 seconds."1199                    )1200            queue.purge_timestamp = time.time()1201            queue.clear()1202    def set_queue_attributes(1203        self, context: RequestContext, queue_url: String, attributes: QueueAttributeMap1204    ) -> None:1205        queue = self._resolve_queue(context, queue_url=queue_url)1206        if not attributes:1207            return1208        queue.validate_queue_attributes(attributes)1209        for k, v in attributes.items():1210            if k in INTERNAL_QUEUE_ATTRIBUTES:1211                raise InvalidAttributeName(f"Unknown Attribute {k}.")1212            queue.attributes[k] = v1213        # Special cases1214        if queue.attributes.get(QueueAttributeName.Policy) == "":1215            del queue.attributes[QueueAttributeName.Policy]1216        redrive_policy = queue.attributes.get(QueueAttributeName.RedrivePolicy)1217        if redrive_policy:1218            _redrive_policy = json.loads(redrive_policy)1219            dl_target_arn = _redrive_policy.get("deadLetterTargetArn")1220            max_receive_count = _redrive_policy.get("maxReceiveCount")1221            # TODO: use the actual AWS responses1222            if not dl_target_arn:1223                raise InvalidParameterValue(1224                    "The required parameter 'deadLetterTargetArn' is missing"1225                )1226            if not max_receive_count:1227                raise InvalidParameterValue("The required parameter 'maxReceiveCount' is missing")1228            try:1229                max_receive_count = int(max_receive_count)1230                valid_count = 1 <= max_receive_count <= 10001231            except ValueError:1232                valid_count = False1233            if not valid_count:1234                raise InvalidParameterValue(1235                    f"Value {redrive_policy} for parameter RedrivePolicy is invalid. Reason: Invalid value for "1236                    f"maxReceiveCount: {max_receive_count}, valid values are from 1 to 1000 both inclusive. "1237                )1238    def tag_queue(self, context: RequestContext, queue_url: String, tags: TagMap) -> None:1239        queue = self._resolve_queue(context, queue_url=queue_url)1240        if not tags:1241            return1242        for k, v in tags.items():1243            queue.tags[k] = v1244    def list_queue_tags(self, context: RequestContext, queue_url: String) -> ListQueueTagsResult:1245        queue = self._resolve_queue(context, queue_url=queue_url)1246        return ListQueueTagsResult(Tags=queue.tags)1247    def untag_queue(self, context: RequestContext, queue_url: String, tag_keys: TagKeyList) -> None:1248        queue = self._resolve_queue(context, queue_url=queue_url)1249        for k in tag_keys:1250            if k in queue.tags:1251                del queue.tags[k]1252    def add_permission(1253        self,1254        context: RequestContext,1255        queue_url: String,1256        label: String,1257        aws_account_ids: AWSAccountIdList,1258        actions: ActionNameList,1259    ) -> None:1260        queue = self._resolve_queue(context, queue_url=queue_url)1261        self._validate_actions(actions)1262        for account_id in aws_account_ids:1263            for action in actions:1264                queue.permissions.add(Permission(label, account_id, action))1265    def remove_permission(self, context: RequestContext, queue_url: String, label: String) -> None:1266        queue = self._resolve_queue(context, queue_url=queue_url)1267        candidates = [p for p in queue.permissions if p.label == label]1268        if candidates:1269            queue.permissions.remove(candidates[0])1270    def _create_message_attributes(1271        self,1272        context: RequestContext,1273        message_system_attributes: MessageBodySystemAttributeMap = None,1274    ) -> Dict[MessageSystemAttributeName, str]:1275        result: Dict[MessageSystemAttributeName, str] = {1276            MessageSystemAttributeName.SenderId: context.account_id,  # not the account ID in AWS1277            MessageSystemAttributeName.SentTimestamp: str(now(millis=True)),1278        }1279        if message_system_attributes is not None:1280            for attr in message_system_attributes:1281                result[attr] = message_system_attributes[attr]["StringValue"]1282        return result1283    def _validate_actions(self, actions: ActionNameList):1284        service = load_service(service=self.service, version=self.version)1285        # FIXME: this is a bit of a heuristic as it will also include actions like "ListQueues" which is not1286        #  associated with an action on a queue1287        valid = list(service.operation_names)1288        valid.append("*")1289        for action in actions:1290            if action not in valid:1291                raise InvalidParameterValue(1292                    f"Value SQS:{action} for parameter ActionName is invalid. Reason: Please refer to the appropriate "1293                    "WSDL for a list of valid actions. "1294                )1295    def _assert_batch(self, batch: List):1296        if not batch:1297            raise EmptyBatchRequest1298        visited = set()1299        for entry in batch:1300            # TODO: InvalidBatchEntryId1301            if entry["Id"] in visited:1302                raise BatchEntryIdsNotDistinct()1303            else:1304                visited.add(entry["Id"])1305    def _assert_valid_batch_size(self, batch: List, max_message_size: int):1306        batch_message_size = sum([len(entry.get("MessageBody").encode("utf8")) for entry in batch])1307        if batch_message_size > max_message_size:1308            error = f"Batch requests cannot be longer than {max_message_size} bytes."1309            error += f" You have sent {batch_message_size} bytes."1310            raise BatchRequestTooLong(error)1311# Method from moto's attribute_md5 of moto/sqs/models.py, separated from the Message Object1312def _create_message_attribute_hash(message_attributes) -> Optional[str]:1313    # To avoid the need to check for dict conformity everytime we invoke this function1314    if not isinstance(message_attributes, dict):1315        return1316    hash = hashlib.md5()1317    for attrName in sorted(message_attributes.keys()):1318        attr_value = message_attributes[attrName]1319        # Encode name...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
