Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...1046            if moved_to_dlq:1047                continue1048            msg = copy.deepcopy(msg)1049            message_filter_attributes(msg, attribute_names)1050            message_filter_message_attributes(msg, message_attribute_names)1051            if msg.get("MessageAttributes"):1052                msg["MD5OfMessageAttributes"] = _create_message_attribute_hash(1053                    msg["MessageAttributes"]1054                )1055            else:1056                # delete the value that was computed when creating the message1057                msg.pop("MD5OfMessageAttributes")1058            # add message to result1059            messages.append(msg)1060            num -= 11061        # TODO: how does receiving behave if the queue was deleted in the meantime?1062        return ReceiveMessageResult(Messages=messages)1063    def _dead_letter_check(1064        self, queue: SqsQueue, std_m: SqsMessage, context: RequestContext1065    ) -> bool:1066        redrive_policy = json.loads(queue.attributes.get(QueueAttributeName.RedrivePolicy))1067        # TODO: include the names of the dictionary sub - attributes in the autogenerated code?1068        max_receive_count = int(redrive_policy["maxReceiveCount"])1069        if std_m.receive_times > max_receive_count:1070            dead_letter_target_arn = redrive_policy["deadLetterTargetArn"]1071            dl_queue = self._require_queue_by_arn(context, dead_letter_target_arn)1072            # TODO: this needs to be atomic?1073            dead_message = std_m.message1074            dl_queue.put(1075                message=dead_message,1076                message_deduplication_id=std_m.message_deduplication_id,1077                message_group_id=std_m.message_group_id,1078            )1079            queue.remove(std_m.message["ReceiptHandle"])1080            return True1081        else:1082            return False1083    def list_dead_letter_source_queues(1084        self,1085        context: RequestContext,1086        queue_url: String,1087        next_token: Token = None,1088        max_results: BoxedInteger = None,1089    ) -> ListDeadLetterSourceQueuesResult:1090        urls = []1091        backend = SqsBackend.get(context.region)1092        dead_letter_queue = self._resolve_queue(context, queue_url=queue_url)1093        for queue in backend.queues.values():1094            policy = queue.attributes.get(QueueAttributeName.RedrivePolicy)1095            if policy:1096                policy = json.loads(policy)1097                dlq_arn = policy.get("deadLetterTargetArn")1098                if dlq_arn == dead_letter_queue.arn:1099                    urls.append(queue.url(context))1100        return ListDeadLetterSourceQueuesResult(queueUrls=urls)1101    def delete_message(1102        self, context: RequestContext, queue_url: String, receipt_handle: String1103    ) -> None:1104        queue = self._resolve_queue(context, queue_url=queue_url)1105        queue.remove(receipt_handle)1106    def delete_message_batch(1107        self,1108        context: RequestContext,1109        queue_url: String,1110        entries: DeleteMessageBatchRequestEntryList,1111    ) -> DeleteMessageBatchResult:1112        queue = self._resolve_queue(context, queue_url=queue_url)1113        self._assert_batch(entries)1114        successful = []1115        failed = []1116        with queue.mutex:1117            for entry in entries:1118                try:1119                    queue.remove(entry["ReceiptHandle"])1120                    successful.append(DeleteMessageBatchResultEntry(Id=entry["Id"]))1121                except Exception as e:1122                    failed.append(1123                        BatchResultErrorEntry(1124                            Id=entry["Id"],1125                            SenderFault=False,1126                            Code=e.__class__.__name__,1127                            Message=str(e),1128                        )1129                    )1130        return DeleteMessageBatchResult(1131            Successful=successful,1132            Failed=failed,1133        )1134    def purge_queue(self, context: RequestContext, queue_url: String) -> None:1135        queue = self._resolve_queue(context, queue_url=queue_url)1136        with self._mutex:1137            # FIXME: use queue-specific locks1138            if queue.purge_in_progress:1139                raise PurgeQueueInProgress()1140            queue.purge_in_progress = True1141        # TODO: how do other methods behave when purge is in progress?1142        try:1143            while True:1144                queue.visible.get_nowait()1145        except Empty:1146            return1147        finally:1148            queue.purge_in_progress = False1149    def set_queue_attributes(1150        self, context: RequestContext, queue_url: String, attributes: QueueAttributeMap1151    ) -> None:1152        queue = self._resolve_queue(context, queue_url=queue_url)1153        if not attributes:1154            return1155        queue.validate_queue_attributes(attributes)1156        for k, v in attributes.items():1157            if k in INTERNAL_QUEUE_ATTRIBUTES:1158                raise InvalidAttributeName(f"Unknown Attribute {k}.")1159            queue.attributes[k] = v1160        # Special cases1161        if queue.attributes.get(QueueAttributeName.Policy) == "":1162            del queue.attributes[QueueAttributeName.Policy]1163        redrive_policy = queue.attributes.get(QueueAttributeName.RedrivePolicy)1164        if redrive_policy:1165            _redrive_policy = json.loads(redrive_policy)1166            dl_target_arn = _redrive_policy.get("deadLetterTargetArn")1167            max_receive_count = _redrive_policy.get("maxReceiveCount")1168            # TODO: use the actual AWS responses1169            if not dl_target_arn:1170                raise InvalidParameterValue(1171                    "The required parameter 'deadLetterTargetArn' is missing"1172                )1173            if not max_receive_count:1174                raise InvalidParameterValue("The required parameter 'maxReceiveCount' is missing")1175            try:1176                max_receive_count = int(max_receive_count)1177                valid_count = 1 <= max_receive_count <= 10001178            except ValueError:1179                valid_count = False1180            if not valid_count:1181                raise InvalidParameterValue(1182                    f"Value {redrive_policy} for parameter RedrivePolicy is invalid. Reason: Invalid value for "1183                    f"maxReceiveCount: {max_receive_count}, valid values are from 1 to 1000 both inclusive. "1184                )1185    def tag_queue(self, context: RequestContext, queue_url: String, tags: TagMap) -> None:1186        queue = self._resolve_queue(context, queue_url=queue_url)1187        if not tags:1188            return1189        for k, v in tags.items():1190            queue.tags[k] = v1191    def list_queue_tags(self, context: RequestContext, queue_url: String) -> ListQueueTagsResult:1192        queue = self._resolve_queue(context, queue_url=queue_url)1193        return ListQueueTagsResult(Tags=queue.tags)1194    def untag_queue(self, context: RequestContext, queue_url: String, tag_keys: TagKeyList) -> None:1195        queue = self._resolve_queue(context, queue_url=queue_url)1196        for k in tag_keys:1197            if k in queue.tags:1198                del queue.tags[k]1199    def add_permission(1200        self,1201        context: RequestContext,1202        queue_url: String,1203        label: String,1204        aws_account_ids: AWSAccountIdList,1205        actions: ActionNameList,1206    ) -> None:1207        queue = self._resolve_queue(context, queue_url=queue_url)1208        self._validate_actions(actions)1209        for account_id in aws_account_ids:1210            for action in actions:1211                queue.permissions.add(Permission(label, account_id, action))1212    def remove_permission(self, context: RequestContext, queue_url: String, label: String) -> None:1213        queue = self._resolve_queue(context, queue_url=queue_url)1214        candidates = [p for p in queue.permissions if p.label == label]1215        if candidates:1216            queue.permissions.remove(candidates[0])1217    def _create_message_attributes(1218        self,1219        context: RequestContext,1220        message_system_attributes: MessageBodySystemAttributeMap = None,1221    ) -> Dict[MessageSystemAttributeName, str]:1222        result: Dict[MessageSystemAttributeName, str] = {1223            MessageSystemAttributeName.SenderId: context.account_id,  # not the account ID in AWS1224            MessageSystemAttributeName.SentTimestamp: str(now(millis=True)),1225        }1226        if message_system_attributes is not None:1227            for attr in message_system_attributes:1228                result[attr] = message_system_attributes[attr]["StringValue"]1229        return result1230    def _validate_queue_attributes(self, attributes: QueueAttributeMap):1231        valid = [k[1] for k in inspect.getmembers(QueueAttributeName)]1232        for k in attributes.keys():1233            if k not in valid or k in INTERNAL_QUEUE_ATTRIBUTES:1234                raise InvalidAttributeName(f"Unknown Attribute {k}.")1235    def _validate_actions(self, actions: ActionNameList):1236        service = load_service(service=self.service, version=self.version)1237        # FIXME: this is a bit of a heuristic as it will also include actions like "ListQueues" which is not1238        #  associated with an action on a queue1239        valid = list(service.operation_names)1240        valid.append("*")1241        for action in actions:1242            if action not in valid:1243                raise InvalidParameterValue(1244                    f"Value SQS:{action} for parameter ActionName is invalid. Reason: Please refer to the appropriate "1245                    "WSDL for a list of valid actions. "1246                )1247    def _assert_batch(self, batch: List):1248        if not batch:1249            raise EmptyBatchRequest1250        visited = set()1251        for entry in batch:1252            # TODO: InvalidBatchEntryId1253            if entry["Id"] in visited:1254                raise BatchEntryIdsNotDistinct()1255            else:1256                visited.add(entry["Id"])1257def _create_mock_sequence_number():1258    return "".join(random.choice(string.digits) for _ in range(20))1259# Method from moto's attribute_md5 of moto/sqs/models.py, separated from the Message Object1260def _create_message_attribute_hash(message_attributes) -> Optional[str]:1261    # To avoid the need to check for dict conformity everytime we invoke this function1262    if not isinstance(message_attributes, dict):1263        return1264    hash = hashlib.md5()1265    for attrName in sorted(message_attributes.keys()):1266        attr_value = message_attributes[attrName]1267        # Encode name1268        MotoMessage.update_binary_length_and_value(hash, MotoMessage.utf8(attrName))1269        # Encode data type1270        MotoMessage.update_binary_length_and_value(hash, MotoMessage.utf8(attr_value["DataType"]))1271        # Encode transport type and value1272        if attr_value.get("StringValue"):1273            hash.update(bytearray([STRING_TYPE_FIELD_INDEX]))1274            MotoMessage.update_binary_length_and_value(1275                hash, MotoMessage.utf8(attr_value.get("StringValue"))1276            )1277        elif attr_value.get("BinaryValue"):1278            hash.update(bytearray([BINARY_TYPE_FIELD_INDEX]))1279            decoded_binary_value = base64.b64decode(attr_value.get("BinaryValue"))1280            MotoMessage.update_binary_length_and_value(hash, decoded_binary_value)1281        # string_list_value, binary_list_value type is not implemented, reserved for the future use.1282        # See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_MessageAttributeValue.html1283    return hash.hexdigest()1284def get_queue_name_from_url(queue_url: str) -> str:1285    return queue_url.rstrip("/").split("/")[-1]1286def resolve_queue_name(1287    context: RequestContext, queue_name: Optional[str] = None, queue_url: Optional[str] = None1288) -> str:1289    """1290    Resolves a queue name from the given information.1291    :param context: the request context, used for getting region and account_id, and optionally the queue_url1292    :param queue_name: the queue name (if this is set, then this will be used for the key)1293    :param queue_url: the queue url (if name is not set, this will be used to determine the queue name)1294    :return: the queue name describing the queue being requested1295    """1296    if not queue_name:1297        if queue_url:1298            queue_name = get_queue_name_from_url(queue_url)1299        else:1300            queue_name = get_queue_name_from_url(context.request.base_url)1301    return queue_name1302def message_filter_attributes(message: Message, names: Optional[AttributeNameList]):1303    """1304    Utility function filter from the given message (in-place) the system attributes from the given list. It will1305    apply all rules according to:1306    https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html#SQS.Client.receive_message.1307    :param message: The message to filter (it will be modified)1308    :param names: the attributes names/filters1309    """1310    if "Attributes" not in message:1311        return1312    if not names:1313        del message["Attributes"]1314        return1315    if "All" in names:1316        return1317    for k in list(message["Attributes"].keys()):1318        if k not in names:1319            del message["Attributes"][k]1320def message_filter_message_attributes(message: Message, names: Optional[MessageAttributeNameList]):1321    """1322    Utility function filter from the given message (in-place) the message attributes from the given list. It will1323    apply all rules according to:1324    https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html#SQS.Client.receive_message.1325    :param message: The message to filter (it will be modified)1326    :param names: the attributes names/filters (can be 'All', '.*', or prefix filters like 'Foo.*')1327    """1328    if not message.get("MessageAttributes"):1329        return1330    if not names:1331        del message["MessageAttributes"]1332        return1333    if "All" in names or ".*" in names:1334        return...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
