How to use delete_message_batch method in localstack

Best Python code snippet using localstack_python

test_sqs.py

Source:test_sqs.py Github

copy

Full Screen

...145 }146 )147 return {'Messages': messages}148 mock_conn.return_value.receive_message.side_effect = mock_receive_message149 def mock_delete_message_batch(**kwargs):150 return {'Successful'}151 mock_conn.return_value.delete_message_batch.side_effect = mock_delete_message_batch152 # Test that messages are filtered153 self.sensor.message_filtering = 'literal'154 self.sensor.message_filtering_match_values = ["a matching message"]155 result = self.sensor.poke(self.mock_context)156 assert result157 # Test that only filtered messages are deleted158 delete_entries = [{'Id': x['id'], 'ReceiptHandle': 100 + x['id']} for x in matching]159 calls_delete_message_batch = [160 mock.call().delete_message_batch(QueueUrl=QUEUE_URL, Entries=delete_entries)161 ]162 mock_conn.assert_has_calls(calls_delete_message_batch)163 @mock.patch.object(SqsHook, "get_conn")164 def test_poke_message_filtering_jsonpath(self, mock_conn):165 self.sqs_hook.create_queue(QUEUE_NAME)166 matching = [167 {"id": 11, "key": {"matches": [1, 2]}},168 {"id": 12, "key": {"matches": [3, 4, 5]}},169 {"id": 13, "key": {"matches": [10]}},170 ]171 non_matching = [172 {"id": 14, "key": {"nope": [5, 6]}},173 {"id": 15, "key": {"nope": [7, 8]}},174 ]175 all = matching + non_matching176 def mock_receive_message(**kwargs):177 messages = []178 for message in all:179 messages.append(180 {181 'MessageId': message['id'],182 'ReceiptHandle': 100 + message['id'],183 'Body': json.dumps(message),184 }185 )186 return {'Messages': messages}187 mock_conn.return_value.receive_message.side_effect = mock_receive_message188 def mock_delete_message_batch(**kwargs):189 return {'Successful'}190 mock_conn.return_value.delete_message_batch.side_effect = mock_delete_message_batch191 # Test that messages are filtered192 self.sensor.message_filtering = 'jsonpath'193 self.sensor.message_filtering_config = 'key.matches[*]'194 result = self.sensor.poke(self.mock_context)195 assert result196 # Test that only filtered messages are deleted197 delete_entries = [{'Id': x['id'], 'ReceiptHandle': 100 + x['id']} for x in matching]198 calls_delete_message_batch = [199 mock.call().delete_message_batch(QueueUrl=QUEUE_URL, Entries=delete_entries)200 ]201 mock_conn.assert_has_calls(calls_delete_message_batch)202 @mock.patch.object(SqsHook, "get_conn")203 def test_poke_message_filtering_jsonpath_values(self, mock_conn):204 self.sqs_hook.create_queue(QUEUE_NAME)205 matching = [206 {"id": 11, "key": {"matches": [1, 2]}},207 {"id": 12, "key": {"matches": [1, 4, 5]}},208 {"id": 13, "key": {"matches": [4, 5]}},209 ]210 non_matching = [211 {"id": 21, "key": {"matches": [10]}},212 {"id": 22, "key": {"nope": [5, 6]}},213 {"id": 23, "key": {"nope": [7, 8]}},214 ]215 all = matching + non_matching216 def mock_receive_message(**kwargs):217 messages = []218 for message in all:219 messages.append(220 {221 'MessageId': message['id'],222 'ReceiptHandle': 100 + message['id'],223 'Body': json.dumps(message),224 }225 )226 return {'Messages': messages}227 mock_conn.return_value.receive_message.side_effect = mock_receive_message228 def mock_delete_message_batch(**kwargs):229 return {'Successful'}230 mock_conn.return_value.delete_message_batch.side_effect = mock_delete_message_batch231 # Test that messages are filtered232 self.sensor.message_filtering = 'jsonpath'233 self.sensor.message_filtering_config = 'key.matches[*]'234 self.sensor.message_filtering_match_values = [1, 4]235 result = self.sensor.poke(self.mock_context)236 assert result237 # Test that only filtered messages are deleted238 delete_entries = [{'Id': x['id'], 'ReceiptHandle': 100 + x['id']} for x in matching]239 calls_delete_message_batch = [240 mock.call().delete_message_batch(QueueUrl='https://test-queue', Entries=delete_entries)241 ]242 mock_conn.assert_has_calls(calls_delete_message_batch)243 @mock.patch.object(SqsHook, "get_conn")244 def test_poke_do_not_delete_message_on_received(self, mock_conn):245 self.sqs_hook.create_queue(QUEUE_NAME)246 self.sqs_hook.send_message(queue_url=QUEUE_URL, message_body='hello')247 self.sensor = SqsSensor(248 task_id='test_task2',249 dag=self.dag,250 sqs_queue=QUEUE_URL,251 aws_conn_id='aws_default',252 # do not delete message upon reception253 delete_message_on_reception=False,254 )...

Full Screen

Full Screen

sqs_helper.py

Source:sqs_helper.py Github

copy

Full Screen

23sqs = boto3.client('sqs')456def delete_message_batch(sqs_queue_url: str, event):7 receiptHandles = list(8 map(9 lambda record: {10 "Id": record["messageId"],11 "ReceiptHandle": record["receiptHandle"]12 }, event['Records']))13 response = sqs.delete_message_batch(QueueUrl=sqs_queue_url,14 Entries=receiptHandles) ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful