Best Python code snippet using localstack_python
sqs_event_source_listener.py
Source:sqs_event_source_listener.py  
...105            if report_partial_failures:106                valid_message_ids = [r["messageId"] for r in records]107                # collect messages to delete (= the ones that were processed successfully)108                try:109                    if messages_to_keep := parse_batch_item_failures(110                        result, valid_message_ids=valid_message_ids111                    ):112                        # unless there is an exception or the parse result is empty, only delete those messages that113                        # are not part of the partial failure report.114                        messages_to_delete = [115                            message_id116                            for message_id in valid_message_ids117                            if message_id not in messages_to_keep118                        ]119                    else:120                        # otherwise delete all messages121                        messages_to_delete = valid_message_ids122                    LOG.debug(123                        "Lambda partial SQS batch failure report: ok=%s, failed=%s",124                        messages_to_delete,125                        messages_to_keep,126                    )127                except Exception as e:128                    LOG.error(129                        "Error while parsing batchItemFailures from lambda response %s: %s. "130                        "Treating the batch as complete failure.",131                        result.result,132                        e,133                    )134                    return135                entries = [136                    {"Id": r["receiptHandle"], "ReceiptHandle": r["receiptHandle"]}137                    for r in records138                    if r["messageId"] in messages_to_delete139                ]140            else:141                entries = [142                    {"Id": r["receiptHandle"], "ReceiptHandle": r["receiptHandle"]} for r in records143                ]144            try:145                sqs_client.delete_message_batch(QueueUrl=queue_url, Entries=entries)146            except Exception as e:147                LOG.info(148                    "Unable to delete Lambda events from SQS queue "149                    + "(please check SQS visibility timeout settings): %s - %s" % (entries, e)150                )151        for msg in messages:152            message_attrs = message_attributes_to_lower(msg.get("MessageAttributes"))153            record = {154                "body": msg.get("Body", "MessageBody"),155                "receiptHandle": msg.get("ReceiptHandle"),156                "md5OfBody": msg.get("MD5OfBody") or msg.get("MD5OfMessageBody"),157                "eventSourceARN": queue_arn,158                "eventSource": lambda_executors.EVENT_SOURCE_SQS,159                "awsRegion": region,160                "messageId": msg["MessageId"],161                "attributes": msg.get("Attributes", {}),162                "messageAttributes": message_attrs,163            }164            if md5OfMessageAttributes := msg.get("MD5OfMessageAttributes"):165                record["md5OfMessageAttributes"] = md5OfMessageAttributes166            records.append(record)167        event_filter_criterias = get_lambda_event_filters_for_arn(lambda_arn, queue_arn)168        if len(event_filter_criterias) > 0:169            # convert to json for filtering170            for record in records:171                try:172                    record["body"] = json.loads(record["body"])173                except json.JSONDecodeError:174                    LOG.warning(175                        f"Unable to convert record '{record['body']}' to json... Record might be dropped."176                    )177            records = filter_stream_records(records, event_filter_criterias)178            # convert them back179            for record in records:180                record["body"] = (181                    json.dumps(record["body"])182                    if not isinstance(record["body"], str)183                    else record["body"]184                )185        # all messages were filtered out186        if not len(records) > 0:187            return True188        event = {"Records": records}189        res = run_lambda(190            func_arn=lambda_arn,191            event=event,192            context={},193            asynchronous=True,194            callback=delete_messages,195        )196        if isinstance(res, InvocationResult):197            status_code = getattr(res.result, "status_code", 0)198            if status_code >= 400:199                return False200        return True201def parse_batch_item_failures(result: InvocationResult, valid_message_ids: List[str]) -> List[str]:202    """203    Parses a lambda responses as a partial batch failure response, that looks something like this::204        {205          "batchItemFailures": [206                {207                    "itemIdentifier": "id2"208                },209                {210                    "itemIdentifier": "id4"211                }212            ]213        }214    If the response returns an empty list, then the batch should be considered as a complete success. If an exception215    is raised, the batch should be considered a complete failure....Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
