Best Python code snippet using localstack_python
test_sqs.py
Source:test_sqs.py  
...145                    }146                )147            return {'Messages': messages}148        mock_conn.return_value.receive_message.side_effect = mock_receive_message149        def mock_delete_message_batch(**kwargs):150            return {'Successful'}151        mock_conn.return_value.delete_message_batch.side_effect = mock_delete_message_batch152        # Test that messages are filtered153        self.sensor.message_filtering = 'literal'154        self.sensor.message_filtering_match_values = ["a matching message"]155        result = self.sensor.poke(self.mock_context)156        assert result157        # Test that only filtered messages are deleted158        delete_entries = [{'Id': x['id'], 'ReceiptHandle': 100 + x['id']} for x in matching]159        calls_delete_message_batch = [160            mock.call().delete_message_batch(QueueUrl=QUEUE_URL, Entries=delete_entries)161        ]162        mock_conn.assert_has_calls(calls_delete_message_batch)163    @mock.patch.object(SqsHook, "get_conn")164    def test_poke_message_filtering_jsonpath(self, mock_conn):165        self.sqs_hook.create_queue(QUEUE_NAME)166        matching = [167            {"id": 11, "key": {"matches": [1, 2]}},168            {"id": 12, "key": {"matches": [3, 4, 5]}},169            {"id": 13, "key": {"matches": [10]}},170        ]171        non_matching = [172            {"id": 14, "key": {"nope": [5, 6]}},173            {"id": 15, "key": {"nope": [7, 8]}},174        ]175        all = matching + non_matching176        def mock_receive_message(**kwargs):177            messages = []178            for message in all:179                messages.append(180                    {181                        'MessageId': message['id'],182                        'ReceiptHandle': 100 + message['id'],183                        'Body': json.dumps(message),184                    }185                )186            return {'Messages': messages}187        mock_conn.return_value.receive_message.side_effect = mock_receive_message188        def mock_delete_message_batch(**kwargs):189            return {'Successful'}190        mock_conn.return_value.delete_message_batch.side_effect = mock_delete_message_batch191        # Test that messages are filtered192        self.sensor.message_filtering = 'jsonpath'193        self.sensor.message_filtering_config = 'key.matches[*]'194        result = self.sensor.poke(self.mock_context)195        assert result196        # Test that only filtered messages are deleted197        delete_entries = [{'Id': x['id'], 'ReceiptHandle': 100 + x['id']} for x in matching]198        calls_delete_message_batch = [199            mock.call().delete_message_batch(QueueUrl=QUEUE_URL, Entries=delete_entries)200        ]201        mock_conn.assert_has_calls(calls_delete_message_batch)202    @mock.patch.object(SqsHook, "get_conn")203    def test_poke_message_filtering_jsonpath_values(self, mock_conn):204        self.sqs_hook.create_queue(QUEUE_NAME)205        matching = [206            {"id": 11, "key": {"matches": [1, 2]}},207            {"id": 12, "key": {"matches": [1, 4, 5]}},208            {"id": 13, "key": {"matches": [4, 5]}},209        ]210        non_matching = [211            {"id": 21, "key": {"matches": [10]}},212            {"id": 22, "key": {"nope": [5, 6]}},213            {"id": 23, "key": {"nope": [7, 8]}},214        ]215        all = matching + non_matching216        def mock_receive_message(**kwargs):217            messages = []218            for message in all:219                messages.append(220                    {221                        'MessageId': message['id'],222                        'ReceiptHandle': 100 + message['id'],223                        'Body': json.dumps(message),224                    }225                )226            return {'Messages': messages}227        mock_conn.return_value.receive_message.side_effect = mock_receive_message228        def mock_delete_message_batch(**kwargs):229            return {'Successful'}230        mock_conn.return_value.delete_message_batch.side_effect = mock_delete_message_batch231        # Test that messages are filtered232        self.sensor.message_filtering = 'jsonpath'233        self.sensor.message_filtering_config = 'key.matches[*]'234        self.sensor.message_filtering_match_values = [1, 4]235        result = self.sensor.poke(self.mock_context)236        assert result237        # Test that only filtered messages are deleted238        delete_entries = [{'Id': x['id'], 'ReceiptHandle': 100 + x['id']} for x in matching]239        calls_delete_message_batch = [240            mock.call().delete_message_batch(QueueUrl='https://test-queue', Entries=delete_entries)241        ]242        mock_conn.assert_has_calls(calls_delete_message_batch)243    @mock.patch.object(SqsHook, "get_conn")244    def test_poke_do_not_delete_message_on_received(self, mock_conn):245        self.sqs_hook.create_queue(QUEUE_NAME)246        self.sqs_hook.send_message(queue_url=QUEUE_URL, message_body='hello')247        self.sensor = SqsSensor(248            task_id='test_task2',249            dag=self.dag,250            sqs_queue=QUEUE_URL,251            aws_conn_id='aws_default',252            # do not delete message upon reception253            delete_message_on_reception=False,254        )...sqs_helper.py
Source:sqs_helper.py  
23sqs = boto3.client('sqs')456def delete_message_batch(sqs_queue_url: str, event):7    receiptHandles = list(8        map(9            lambda record: {10                "Id": record["messageId"],11                "ReceiptHandle": record["receiptHandle"]12            }, event['Records']))13    response = sqs.delete_message_batch(QueueUrl=sqs_queue_url,14                                        Entries=receiptHandles)
...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
