How to use message_filter_attributes method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...1045 moved_to_dlq = self._dead_letter_check(queue, standard_message, context)1046 if moved_to_dlq:1047 continue1048 msg = copy.deepcopy(msg)1049 message_filter_attributes(msg, attribute_names)1050 message_filter_message_attributes(msg, message_attribute_names)1051 if msg.get("MessageAttributes"):1052 msg["MD5OfMessageAttributes"] = _create_message_attribute_hash(1053 msg["MessageAttributes"]1054 )1055 else:1056 # delete the value that was computed when creating the message1057 msg.pop("MD5OfMessageAttributes")1058 # add message to result1059 messages.append(msg)1060 num -= 11061 # TODO: how does receiving behave if the queue was deleted in the meantime?1062 return ReceiveMessageResult(Messages=messages)1063 def _dead_letter_check(1064 self, queue: SqsQueue, std_m: SqsMessage, context: RequestContext1065 ) -> bool:1066 redrive_policy = json.loads(queue.attributes.get(QueueAttributeName.RedrivePolicy))1067 # TODO: include the names of the dictionary sub - attributes in the autogenerated code?1068 max_receive_count = int(redrive_policy["maxReceiveCount"])1069 if std_m.receive_times > max_receive_count:1070 dead_letter_target_arn = redrive_policy["deadLetterTargetArn"]1071 dl_queue = self._require_queue_by_arn(context, dead_letter_target_arn)1072 # TODO: this needs to be atomic?1073 dead_message = std_m.message1074 dl_queue.put(1075 message=dead_message,1076 message_deduplication_id=std_m.message_deduplication_id,1077 message_group_id=std_m.message_group_id,1078 )1079 queue.remove(std_m.message["ReceiptHandle"])1080 return True1081 else:1082 return False1083 def list_dead_letter_source_queues(1084 self,1085 context: RequestContext,1086 queue_url: String,1087 next_token: Token = None,1088 max_results: BoxedInteger = None,1089 ) -> ListDeadLetterSourceQueuesResult:1090 urls = []1091 backend = SqsBackend.get(context.region)1092 dead_letter_queue = self._resolve_queue(context, queue_url=queue_url)1093 for queue in backend.queues.values():1094 policy = queue.attributes.get(QueueAttributeName.RedrivePolicy)1095 if policy:1096 policy = json.loads(policy)1097 dlq_arn = policy.get("deadLetterTargetArn")1098 if dlq_arn == dead_letter_queue.arn:1099 urls.append(queue.url(context))1100 return ListDeadLetterSourceQueuesResult(queueUrls=urls)1101 def delete_message(1102 self, context: RequestContext, queue_url: String, receipt_handle: String1103 ) -> None:1104 queue = self._resolve_queue(context, queue_url=queue_url)1105 queue.remove(receipt_handle)1106 def delete_message_batch(1107 self,1108 context: RequestContext,1109 queue_url: String,1110 entries: DeleteMessageBatchRequestEntryList,1111 ) -> DeleteMessageBatchResult:1112 queue = self._resolve_queue(context, queue_url=queue_url)1113 self._assert_batch(entries)1114 successful = []1115 failed = []1116 with queue.mutex:1117 for entry in entries:1118 try:1119 queue.remove(entry["ReceiptHandle"])1120 successful.append(DeleteMessageBatchResultEntry(Id=entry["Id"]))1121 except Exception as e:1122 failed.append(1123 BatchResultErrorEntry(1124 Id=entry["Id"],1125 SenderFault=False,1126 Code=e.__class__.__name__,1127 Message=str(e),1128 )1129 )1130 return DeleteMessageBatchResult(1131 Successful=successful,1132 Failed=failed,1133 )1134 def purge_queue(self, context: RequestContext, queue_url: String) -> None:1135 queue = self._resolve_queue(context, queue_url=queue_url)1136 with self._mutex:1137 # FIXME: use queue-specific locks1138 if queue.purge_in_progress:1139 raise PurgeQueueInProgress()1140 queue.purge_in_progress = True1141 # TODO: how do other methods behave when purge is in progress?1142 try:1143 while True:1144 queue.visible.get_nowait()1145 except Empty:1146 return1147 finally:1148 queue.purge_in_progress = False1149 def set_queue_attributes(1150 self, context: RequestContext, queue_url: String, attributes: QueueAttributeMap1151 ) -> None:1152 queue = self._resolve_queue(context, queue_url=queue_url)1153 if not attributes:1154 return1155 queue.validate_queue_attributes(attributes)1156 for k, v in attributes.items():1157 if k in INTERNAL_QUEUE_ATTRIBUTES:1158 raise InvalidAttributeName(f"Unknown Attribute {k}.")1159 queue.attributes[k] = v1160 # Special cases1161 if queue.attributes.get(QueueAttributeName.Policy) == "":1162 del queue.attributes[QueueAttributeName.Policy]1163 redrive_policy = queue.attributes.get(QueueAttributeName.RedrivePolicy)1164 if redrive_policy:1165 _redrive_policy = json.loads(redrive_policy)1166 dl_target_arn = _redrive_policy.get("deadLetterTargetArn")1167 max_receive_count = _redrive_policy.get("maxReceiveCount")1168 # TODO: use the actual AWS responses1169 if not dl_target_arn:1170 raise InvalidParameterValue(1171 "The required parameter 'deadLetterTargetArn' is missing"1172 )1173 if not max_receive_count:1174 raise InvalidParameterValue("The required parameter 'maxReceiveCount' is missing")1175 try:1176 max_receive_count = int(max_receive_count)1177 valid_count = 1 <= max_receive_count <= 10001178 except ValueError:1179 valid_count = False1180 if not valid_count:1181 raise InvalidParameterValue(1182 f"Value {redrive_policy} for parameter RedrivePolicy is invalid. Reason: Invalid value for "1183 f"maxReceiveCount: {max_receive_count}, valid values are from 1 to 1000 both inclusive. "1184 )1185 def tag_queue(self, context: RequestContext, queue_url: String, tags: TagMap) -> None:1186 queue = self._resolve_queue(context, queue_url=queue_url)1187 if not tags:1188 return1189 for k, v in tags.items():1190 queue.tags[k] = v1191 def list_queue_tags(self, context: RequestContext, queue_url: String) -> ListQueueTagsResult:1192 queue = self._resolve_queue(context, queue_url=queue_url)1193 return ListQueueTagsResult(Tags=queue.tags)1194 def untag_queue(self, context: RequestContext, queue_url: String, tag_keys: TagKeyList) -> None:1195 queue = self._resolve_queue(context, queue_url=queue_url)1196 for k in tag_keys:1197 if k in queue.tags:1198 del queue.tags[k]1199 def add_permission(1200 self,1201 context: RequestContext,1202 queue_url: String,1203 label: String,1204 aws_account_ids: AWSAccountIdList,1205 actions: ActionNameList,1206 ) -> None:1207 queue = self._resolve_queue(context, queue_url=queue_url)1208 self._validate_actions(actions)1209 for account_id in aws_account_ids:1210 for action in actions:1211 queue.permissions.add(Permission(label, account_id, action))1212 def remove_permission(self, context: RequestContext, queue_url: String, label: String) -> None:1213 queue = self._resolve_queue(context, queue_url=queue_url)1214 candidates = [p for p in queue.permissions if p.label == label]1215 if candidates:1216 queue.permissions.remove(candidates[0])1217 def _create_message_attributes(1218 self,1219 context: RequestContext,1220 message_system_attributes: MessageBodySystemAttributeMap = None,1221 ) -> Dict[MessageSystemAttributeName, str]:1222 result: Dict[MessageSystemAttributeName, str] = {1223 MessageSystemAttributeName.SenderId: context.account_id, # not the account ID in AWS1224 MessageSystemAttributeName.SentTimestamp: str(now(millis=True)),1225 }1226 if message_system_attributes is not None:1227 for attr in message_system_attributes:1228 result[attr] = message_system_attributes[attr]["StringValue"]1229 return result1230 def _validate_queue_attributes(self, attributes: QueueAttributeMap):1231 valid = [k[1] for k in inspect.getmembers(QueueAttributeName)]1232 for k in attributes.keys():1233 if k not in valid or k in INTERNAL_QUEUE_ATTRIBUTES:1234 raise InvalidAttributeName(f"Unknown Attribute {k}.")1235 def _validate_actions(self, actions: ActionNameList):1236 service = load_service(service=self.service, version=self.version)1237 # FIXME: this is a bit of a heuristic as it will also include actions like "ListQueues" which is not1238 # associated with an action on a queue1239 valid = list(service.operation_names)1240 valid.append("*")1241 for action in actions:1242 if action not in valid:1243 raise InvalidParameterValue(1244 f"Value SQS:{action} for parameter ActionName is invalid. Reason: Please refer to the appropriate "1245 "WSDL for a list of valid actions. "1246 )1247 def _assert_batch(self, batch: List):1248 if not batch:1249 raise EmptyBatchRequest1250 visited = set()1251 for entry in batch:1252 # TODO: InvalidBatchEntryId1253 if entry["Id"] in visited:1254 raise BatchEntryIdsNotDistinct()1255 else:1256 visited.add(entry["Id"])1257def _create_mock_sequence_number():1258 return "".join(random.choice(string.digits) for _ in range(20))1259# Method from moto's attribute_md5 of moto/sqs/models.py, separated from the Message Object1260def _create_message_attribute_hash(message_attributes) -> Optional[str]:1261 # To avoid the need to check for dict conformity everytime we invoke this function1262 if not isinstance(message_attributes, dict):1263 return1264 hash = hashlib.md5()1265 for attrName in sorted(message_attributes.keys()):1266 attr_value = message_attributes[attrName]1267 # Encode name1268 MotoMessage.update_binary_length_and_value(hash, MotoMessage.utf8(attrName))1269 # Encode data type1270 MotoMessage.update_binary_length_and_value(hash, MotoMessage.utf8(attr_value["DataType"]))1271 # Encode transport type and value1272 if attr_value.get("StringValue"):1273 hash.update(bytearray([STRING_TYPE_FIELD_INDEX]))1274 MotoMessage.update_binary_length_and_value(1275 hash, MotoMessage.utf8(attr_value.get("StringValue"))1276 )1277 elif attr_value.get("BinaryValue"):1278 hash.update(bytearray([BINARY_TYPE_FIELD_INDEX]))1279 decoded_binary_value = base64.b64decode(attr_value.get("BinaryValue"))1280 MotoMessage.update_binary_length_and_value(hash, decoded_binary_value)1281 # string_list_value, binary_list_value type is not implemented, reserved for the future use.1282 # See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_MessageAttributeValue.html1283 return hash.hexdigest()1284def get_queue_name_from_url(queue_url: str) -> str:1285 return queue_url.rstrip("/").split("/")[-1]1286def resolve_queue_name(1287 context: RequestContext, queue_name: Optional[str] = None, queue_url: Optional[str] = None1288) -> str:1289 """1290 Resolves a queue name from the given information.1291 :param context: the request context, used for getting region and account_id, and optionally the queue_url1292 :param queue_name: the queue name (if this is set, then this will be used for the key)1293 :param queue_url: the queue url (if name is not set, this will be used to determine the queue name)1294 :return: the queue name describing the queue being requested1295 """1296 if not queue_name:1297 if queue_url:1298 queue_name = get_queue_name_from_url(queue_url)1299 else:1300 queue_name = get_queue_name_from_url(context.request.base_url)1301 return queue_name1302def message_filter_attributes(message: Message, names: Optional[AttributeNameList]):1303 """1304 Utility function filter from the given message (in-place) the system attributes from the given list. It will1305 apply all rules according to:1306 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html#SQS.Client.receive_message.1307 :param message: The message to filter (it will be modified)1308 :param names: the attributes names/filters1309 """1310 if "Attributes" not in message:1311 return1312 if not names:1313 del message["Attributes"]1314 return1315 if "All" in names:1316 return...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful