How to use _assert_valid_batch_size method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...988 self._assert_batch(entries)989 # check the total batch size first and raise BatchRequestTooLong id > DEFAULT_MAXIMUM_MESSAGE_SIZE.990 # This is checked before any messages in the batch are sent. Raising the exception here should991 # cause error response, rather than batching error results and returning992 self._assert_valid_batch_size(entries, DEFAULT_MAXIMUM_MESSAGE_SIZE)993 successful = []994 failed = []995 with queue.mutex:996 for entry in entries:997 try:998 queue_item = self._put_message(999 queue,1000 context,1001 message_body=entry.get("MessageBody"),1002 delay_seconds=entry.get("DelaySeconds"),1003 message_attributes=entry.get("MessageAttributes"),1004 message_system_attributes=entry.get("MessageSystemAttributes"),1005 message_deduplication_id=entry.get("MessageDeduplicationId"),1006 message_group_id=entry.get("MessageGroupId"),1007 )1008 message = queue_item.message1009 successful.append(1010 SendMessageBatchResultEntry(1011 Id=entry["Id"],1012 MessageId=message.get("MessageId"),1013 MD5OfMessageBody=message.get("MD5OfBody"),1014 MD5OfMessageAttributes=message.get("MD5OfMessageAttributes"),1015 MD5OfMessageSystemAttributes=_create_message_attribute_hash(1016 message.get("message_system_attributes")1017 ),1018 SequenceNumber=queue_item.sequence_number,1019 )1020 )1021 except Exception as e:1022 failed.append(1023 BatchResultErrorEntry(1024 Id=entry["Id"],1025 SenderFault=False,1026 Code=e.__class__.__name__,1027 Message=str(e),1028 )1029 )1030 return SendMessageBatchResult(1031 Successful=successful,1032 Failed=failed,1033 )1034 def _put_message(1035 self,1036 queue: SqsQueue,1037 context: RequestContext,1038 message_body: String,1039 delay_seconds: Integer = None,1040 message_attributes: MessageBodyAttributeMap = None,1041 message_system_attributes: MessageBodySystemAttributeMap = None,1042 message_deduplication_id: String = None,1043 message_group_id: String = None,1044 ) -> SqsMessage:1045 check_message_content(message_body)1046 check_attributes(message_attributes)1047 check_attributes(message_system_attributes)1048 check_fifo_id(message_deduplication_id)1049 check_fifo_id(message_group_id)1050 message = Message(1051 MessageId=generate_message_id(),1052 MD5OfBody=md5(message_body),1053 Body=message_body,1054 Attributes=self._create_message_attributes(context, message_system_attributes),1055 MD5OfMessageAttributes=_create_message_attribute_hash(message_attributes),1056 MessageAttributes=message_attributes,1057 )1058 return queue.put(1059 message=message,1060 message_deduplication_id=message_deduplication_id,1061 message_group_id=message_group_id,1062 delay_seconds=int(delay_seconds) if delay_seconds is not None else None,1063 )1064 def receive_message(1065 self,1066 context: RequestContext,1067 queue_url: String,1068 attribute_names: AttributeNameList = None,1069 message_attribute_names: MessageAttributeNameList = None,1070 max_number_of_messages: Integer = None,1071 visibility_timeout: Integer = None,1072 wait_time_seconds: Integer = None,1073 receive_request_attempt_id: String = None,1074 ) -> ReceiveMessageResult:1075 queue = self._resolve_queue(context, queue_url=queue_url)1076 if wait_time_seconds is None:1077 wait_time_seconds = queue.wait_time_seconds1078 num = max_number_of_messages or 11079 block = True if wait_time_seconds else False1080 # collect messages1081 messages = []1082 # we chose to always return the maximum possible number of messages, even though AWS will typically return1083 # fewer messages than requested on small queues. at some point we could maybe change this to randomly sample1084 # between 1 and max_number_of_messages.1085 # see https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html1086 while num:1087 try:1088 standard_message = queue.get(1089 block=block, timeout=wait_time_seconds, visibility_timeout=visibility_timeout1090 )1091 msg = standard_message.message1092 except Empty:1093 break1094 # setting block to false guarantees that, if we've already waited before, we don't wait the full time1095 # again in the next iteration if max_number_of_messages is set but there are no more messages in the1096 # queue. see https://github.com/localstack/localstack/issues/58241097 block = False1098 moved_to_dlq = False1099 if (1100 queue.attributes1101 and queue.attributes.get(QueueAttributeName.RedrivePolicy) is not None1102 ):1103 moved_to_dlq = self._dead_letter_check(queue, standard_message, context)1104 if moved_to_dlq:1105 continue1106 msg = copy.deepcopy(msg)1107 message_filter_attributes(msg, attribute_names)1108 message_filter_message_attributes(msg, message_attribute_names)1109 if msg.get("MessageAttributes"):1110 msg["MD5OfMessageAttributes"] = _create_message_attribute_hash(1111 msg["MessageAttributes"]1112 )1113 else:1114 # delete the value that was computed when creating the message1115 msg.pop("MD5OfMessageAttributes")1116 # add message to result1117 messages.append(msg)1118 num -= 11119 # TODO: how does receiving behave if the queue was deleted in the meantime?1120 return ReceiveMessageResult(Messages=messages)1121 def _dead_letter_check(1122 self, queue: SqsQueue, std_m: SqsMessage, context: RequestContext1123 ) -> bool:1124 redrive_policy = json.loads(queue.attributes.get(QueueAttributeName.RedrivePolicy))1125 # TODO: include the names of the dictionary sub - attributes in the autogenerated code?1126 max_receive_count = int(redrive_policy["maxReceiveCount"])1127 if std_m.receive_times > max_receive_count:1128 dead_letter_target_arn = redrive_policy["deadLetterTargetArn"]1129 dl_queue = self._require_queue_by_arn(context, dead_letter_target_arn)1130 # TODO: this needs to be atomic?1131 dead_message = std_m.message1132 dl_queue.put(1133 message=dead_message,1134 message_deduplication_id=std_m.message_deduplication_id,1135 message_group_id=std_m.message_group_id,1136 )1137 queue.remove(std_m.message["ReceiptHandle"])1138 return True1139 else:1140 return False1141 def list_dead_letter_source_queues(1142 self,1143 context: RequestContext,1144 queue_url: String,1145 next_token: Token = None,1146 max_results: BoxedInteger = None,1147 ) -> ListDeadLetterSourceQueuesResult:1148 urls = []1149 backend = SqsBackend.get(context.region)1150 dead_letter_queue = self._resolve_queue(context, queue_url=queue_url)1151 for queue in backend.queues.values():1152 policy = queue.attributes.get(QueueAttributeName.RedrivePolicy)1153 if policy:1154 policy = json.loads(policy)1155 dlq_arn = policy.get("deadLetterTargetArn")1156 if dlq_arn == dead_letter_queue.arn:1157 urls.append(queue.url(context))1158 return ListDeadLetterSourceQueuesResult(queueUrls=urls)1159 def delete_message(1160 self, context: RequestContext, queue_url: String, receipt_handle: String1161 ) -> None:1162 queue = self._resolve_queue(context, queue_url=queue_url)1163 queue.remove(receipt_handle)1164 def delete_message_batch(1165 self,1166 context: RequestContext,1167 queue_url: String,1168 entries: DeleteMessageBatchRequestEntryList,1169 ) -> DeleteMessageBatchResult:1170 queue = self._resolve_queue(context, queue_url=queue_url)1171 self._assert_batch(entries)1172 successful = []1173 failed = []1174 with queue.mutex:1175 for entry in entries:1176 try:1177 queue.remove(entry["ReceiptHandle"])1178 successful.append(DeleteMessageBatchResultEntry(Id=entry["Id"]))1179 except Exception as e:1180 failed.append(1181 BatchResultErrorEntry(1182 Id=entry["Id"],1183 SenderFault=False,1184 Code=e.__class__.__name__,1185 Message=str(e),1186 )1187 )1188 return DeleteMessageBatchResult(1189 Successful=successful,1190 Failed=failed,1191 )1192 def purge_queue(self, context: RequestContext, queue_url: String) -> None:1193 queue = self._resolve_queue(context, queue_url=queue_url)1194 with queue.mutex:1195 if config.SQS_DELAY_PURGE_RETRY:1196 if queue.purge_timestamp and (queue.purge_timestamp + 60) > time.time():1197 raise PurgeQueueInProgress(1198 f"Only one PurgeQueue operation on {queue.name} is allowed every 60 seconds."1199 )1200 queue.purge_timestamp = time.time()1201 queue.clear()1202 def set_queue_attributes(1203 self, context: RequestContext, queue_url: String, attributes: QueueAttributeMap1204 ) -> None:1205 queue = self._resolve_queue(context, queue_url=queue_url)1206 if not attributes:1207 return1208 queue.validate_queue_attributes(attributes)1209 for k, v in attributes.items():1210 if k in INTERNAL_QUEUE_ATTRIBUTES:1211 raise InvalidAttributeName(f"Unknown Attribute {k}.")1212 queue.attributes[k] = v1213 # Special cases1214 if queue.attributes.get(QueueAttributeName.Policy) == "":1215 del queue.attributes[QueueAttributeName.Policy]1216 redrive_policy = queue.attributes.get(QueueAttributeName.RedrivePolicy)1217 if redrive_policy:1218 _redrive_policy = json.loads(redrive_policy)1219 dl_target_arn = _redrive_policy.get("deadLetterTargetArn")1220 max_receive_count = _redrive_policy.get("maxReceiveCount")1221 # TODO: use the actual AWS responses1222 if not dl_target_arn:1223 raise InvalidParameterValue(1224 "The required parameter 'deadLetterTargetArn' is missing"1225 )1226 if not max_receive_count:1227 raise InvalidParameterValue("The required parameter 'maxReceiveCount' is missing")1228 try:1229 max_receive_count = int(max_receive_count)1230 valid_count = 1 <= max_receive_count <= 10001231 except ValueError:1232 valid_count = False1233 if not valid_count:1234 raise InvalidParameterValue(1235 f"Value {redrive_policy} for parameter RedrivePolicy is invalid. Reason: Invalid value for "1236 f"maxReceiveCount: {max_receive_count}, valid values are from 1 to 1000 both inclusive. "1237 )1238 def tag_queue(self, context: RequestContext, queue_url: String, tags: TagMap) -> None:1239 queue = self._resolve_queue(context, queue_url=queue_url)1240 if not tags:1241 return1242 for k, v in tags.items():1243 queue.tags[k] = v1244 def list_queue_tags(self, context: RequestContext, queue_url: String) -> ListQueueTagsResult:1245 queue = self._resolve_queue(context, queue_url=queue_url)1246 return ListQueueTagsResult(Tags=queue.tags)1247 def untag_queue(self, context: RequestContext, queue_url: String, tag_keys: TagKeyList) -> None:1248 queue = self._resolve_queue(context, queue_url=queue_url)1249 for k in tag_keys:1250 if k in queue.tags:1251 del queue.tags[k]1252 def add_permission(1253 self,1254 context: RequestContext,1255 queue_url: String,1256 label: String,1257 aws_account_ids: AWSAccountIdList,1258 actions: ActionNameList,1259 ) -> None:1260 queue = self._resolve_queue(context, queue_url=queue_url)1261 self._validate_actions(actions)1262 for account_id in aws_account_ids:1263 for action in actions:1264 queue.permissions.add(Permission(label, account_id, action))1265 def remove_permission(self, context: RequestContext, queue_url: String, label: String) -> None:1266 queue = self._resolve_queue(context, queue_url=queue_url)1267 candidates = [p for p in queue.permissions if p.label == label]1268 if candidates:1269 queue.permissions.remove(candidates[0])1270 def _create_message_attributes(1271 self,1272 context: RequestContext,1273 message_system_attributes: MessageBodySystemAttributeMap = None,1274 ) -> Dict[MessageSystemAttributeName, str]:1275 result: Dict[MessageSystemAttributeName, str] = {1276 MessageSystemAttributeName.SenderId: context.account_id, # not the account ID in AWS1277 MessageSystemAttributeName.SentTimestamp: str(now(millis=True)),1278 }1279 if message_system_attributes is not None:1280 for attr in message_system_attributes:1281 result[attr] = message_system_attributes[attr]["StringValue"]1282 return result1283 def _validate_actions(self, actions: ActionNameList):1284 service = load_service(service=self.service, version=self.version)1285 # FIXME: this is a bit of a heuristic as it will also include actions like "ListQueues" which is not1286 # associated with an action on a queue1287 valid = list(service.operation_names)1288 valid.append("*")1289 for action in actions:1290 if action not in valid:1291 raise InvalidParameterValue(1292 f"Value SQS:{action} for parameter ActionName is invalid. Reason: Please refer to the appropriate "1293 "WSDL for a list of valid actions. "1294 )1295 def _assert_batch(self, batch: List):1296 if not batch:1297 raise EmptyBatchRequest1298 visited = set()1299 for entry in batch:1300 # TODO: InvalidBatchEntryId1301 if entry["Id"] in visited:1302 raise BatchEntryIdsNotDistinct()1303 else:1304 visited.add(entry["Id"])1305 def _assert_valid_batch_size(self, batch: List, max_message_size: int):1306 batch_message_size = sum([len(entry.get("MessageBody").encode("utf8")) for entry in batch])1307 if batch_message_size > max_message_size:1308 error = f"Batch requests cannot be longer than {max_message_size} bytes."1309 error += f" You have sent {batch_message_size} bytes."1310 raise BatchRequestTooLong(error)1311# Method from moto's attribute_md5 of moto/sqs/models.py, separated from the Message Object1312def _create_message_attribute_hash(message_attributes) -> Optional[str]:1313 # To avoid the need to check for dict conformity everytime we invoke this function1314 if not isinstance(message_attributes, dict):1315 return1316 hash = hashlib.md5()1317 for attrName in sorted(message_attributes.keys()):1318 attr_value = message_attributes[attrName]1319 # Encode name...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful