How to use send_message_batch method in localstack

Best Python code snippet using localstack_python

batch_sender_test.py

Source:batch_sender_test.py Github

copy

Full Screen

1from botocore.errorfactory import ClientError2from botocore.exceptions import MissingParametersError3from aws_sqs_batch_sender import BatchSender4import unittest5from unittest import mock6class BatchSenderTest(unittest.TestCase):7 def setUp(self):8 self.client = mock.Mock()9 self.client.send_message_batch.return_value = {'Successful': [], 'Failed': []}10 self.queue_url = 'queue_url'11 self.flush_amount = 212 self.batch_sender = BatchSender(self.queue_url, self.client,13 overwrite_by_id=True,14 flush_amount=self.flush_amount,15 max_send_attempts=2,16 max_backoff_sleep_secs=0.0000001)17 def test_send_message_does_not_immediately_send(self):18 self.batch_sender.send_message(Id='1', MessageBody=b'')19 self.assertFalse(self.client.send_message_batch.called)20 self.assertFalse(self.client.send_messages.called)21 def test_send_message_flushes_at_flush_amount(self):22 self.batch_sender.send_message(Id='1', MessageBody=b'')23 self.batch_sender.send_message(Id='2', MessageBody=b'')24 expected = [{"QueueUrl": "queue_url",25 "Entries": [{'Id': '1', 'MessageBody': b''},26 {'Id': '2', 'MessageBody': b''}]}]27 self.assert_send_message_batch_calls_are(expected)28 self.assert_messages_buffer_empty()29 def test_multiple_flushes_reset_messages_to_send(self):30 self.batch_sender.send_message(Id='1', MessageBody=b'')31 self.batch_sender.send_message(Id='2', MessageBody=b'')32 self.batch_sender.send_message(Id='3', MessageBody=b'')33 self.batch_sender.send_message(Id='4', MessageBody=b'')34 first_batch = {"QueueUrl": "queue_url",35 "Entries": [{'Id': '1', 'MessageBody': b''},36 {'Id': '2', 'MessageBody': b''}]}37 second_batch = {"QueueUrl": "queue_url",38 "Entries": [{'Id': '3', 'MessageBody': b''},39 {'Id': '4', 'MessageBody': b''}]}40 self.assert_send_message_batch_calls_are([first_batch, second_batch])41 self.assert_messages_buffer_empty()42 def test_flush_resends_batch_on_repeatedly_fully_unprocessed_batch(self):43 self.client.send_message_batch.side_effect = [44 {45 'Failed': [{"Id": "1", "SenderFault": False},46 {"Id": "2", "SenderFault": False}]47 },48 {49 'Failed': [{"Id": "1", "SenderFault": False},50 {"Id": "2", "SenderFault": False}]51 },52 {53 'Failed': []54 },55 {56 'Failed': []57 }58 ]59 with self.batch_sender:60 self.batch_sender.send_message(Id='1', MessageBody=b'')61 self.batch_sender.send_message(Id='2', MessageBody=b'')62 self.batch_sender.send_message(Id='3', MessageBody=b'')63 first_batch = {"QueueUrl": "queue_url",64 "Entries": [{'Id': '1', 'MessageBody': b''},65 {'Id': '2', 'MessageBody': b''}]}66 second_batch = {"QueueUrl": "queue_url",67 "Entries": [{'Id': '3', 'MessageBody': b''}]}68 self.assert_send_message_batch_calls_are([first_batch, first_batch, first_batch, second_batch])69 self.assert_messages_buffer_empty()70 def test_fails_on_failed_send_because_of_sender_fault(self):71 self.client.send_message_batch.side_effect = [72 {73 'Failed': [{"Id": "1", "SenderFault": True, "Code": "Code"},74 {"Id": "2", "SenderFault": False, "Code": "Code"}]75 }76 ]77 with self.assertRaises(ClientError) as context:78 self.batch_sender.send_message(Id='1', MessageBody=b'')79 self.batch_sender.send_message(Id='2', MessageBody=b'')80 self.assertEqual(context.exception, "Code")81 self.assert_messages_buffer_is([{"Id": '2', "MessageBody": b''}])82 def test_unprocessed_items_added_to_next_batch(self):83 # Suppose the server sends backs a response that indicates that84 # one item was unprocessed.85 self.client.send_message_batch.side_effect = [86 {87 'Failed': [{"Id": "2", "SenderFault": False}]88 },89 # Then everything went through90 {},91 {}92 ]93 with self.batch_sender:94 self.batch_sender.send_message(Id='1', MessageBody=b'')95 self.batch_sender.send_message(Id='2', MessageBody=b'')96 self.batch_sender.send_message(Id='3', MessageBody=b'')97 self.batch_sender.send_message(Id='4', MessageBody=b'')98 first_batch = {"QueueUrl": "queue_url",99 "Entries": [{'Id': '1', 'MessageBody': b''},100 {'Id': '2', 'MessageBody': b''}]}101 second_batch = {"QueueUrl": "queue_url",102 "Entries": [{'Id': '2', 'MessageBody': b''},103 {'Id': '3', 'MessageBody': b''}]}104 third_batch = {"QueueUrl": "queue_url",105 "Entries": [{'Id': '4', 'MessageBody': b''}]}106 self.assert_send_message_batch_calls_are([first_batch, second_batch, third_batch])107 self.assert_messages_buffer_empty()108 def test_all_messages_sent_on_exit(self):109 with self.batch_sender as b:110 b.send_message(Id='1', MessageBody=b'')111 self.assert_send_message_batch_calls_are([{"QueueUrl": "queue_url",112 "Entries": [{'Id': '1', 'MessageBody': b''}]}])113 self.assert_messages_buffer_empty()114 def test_never_send_more_than_max_batch_size(self):115 # Suppose the server sends backs a response that indicates that116 # all the items were unprocessed.117 self.client.send_message_batch.side_effect = [118 {119 'Failed': [{"Id": "1", "SenderFault": False},120 {"Id": "2", "SenderFault": False}]121 },122 {},123 # Then the last response shows that everything went through124 {}125 ]126 with BatchSender(self.queue_url, self.client, flush_amount=2) as b:127 b.send_message(Id='1', MessageBody=b'')128 b.send_message(Id='2', MessageBody=b'')129 b.send_message(Id='3', MessageBody=b'')130 # Note how we're never sending more than flush_amount=2.131 first_batch = {"QueueUrl": "queue_url",132 "Entries": [{'Id': '1', 'MessageBody': b''},133 {'Id': '2', 'MessageBody': b''}]}134 # Even when the server sends us unprocessed items of 2 elements,135 # we'll still only send 2 at a time, in order.136 second_batch = {"QueueUrl": "queue_url",137 "Entries": [{'Id': '1', 'MessageBody': b''},138 {'Id': '2', 'MessageBody': b''}]}139 # And then we still see one more unprocessed item so140 # we need to send another batch.141 third_batch = {"QueueUrl": "queue_url",142 "Entries": [{'Id': '3', 'MessageBody': b''}]}143 self.assert_send_message_batch_calls_are([first_batch, second_batch,144 third_batch])145 self.assert_messages_buffer_empty()146 def test_repeated_flushing_on_exit(self):147 # We're going to simulate failed items148 # returning multiple failed items across calls.149 self.client.send_message_batch.side_effect = [150 {151 'Failed': [152 {"Id": "2", "SenderFault": False}153 ],154 },155 {156 'Failed': [157 {"Id": "2", "SenderFault": False}158 ],159 },160 {}161 ]162 with BatchSender(self.queue_url, self.client, overwrite_by_id=False, flush_amount=2) as b:163 b.send_message(Id='1', MessageBody=b'')164 b.send_message(Id='2', MessageBody=b'')165 # So when we exit, we expect three calls.166 # First we try the normal batch write with 3 items:167 first_batch = {"QueueUrl": "queue_url",168 "Entries": [{'Id': '1', 'MessageBody': b''},169 {'Id': '2', 'MessageBody': b''}]}170 second_batch = {"QueueUrl": "queue_url",171 "Entries": [{'Id': '2', 'MessageBody': b''}]}172 third_batch = {"QueueUrl": "queue_url",173 "Entries": [{'Id': '2', 'MessageBody': b''}]}174 self.assert_send_message_batch_calls_are([first_batch, second_batch,175 third_batch])176 self.assert_messages_buffer_empty()177 def test_auto_dedup_for_dup_requests_at_the_beginning(self):178 with BatchSender(self.queue_url, self.client,179 flush_amount=2, overwrite_by_id=True) as b:180 b.send_message(Id='1', MessageBody=b'first')181 b.send_message(Id='1', MessageBody=b'second')182 b.send_message(Id='1', MessageBody=b'third')183 b.send_message(Id='2', MessageBody=b'')184 first_batch = {"QueueUrl": "queue_url",185 "Entries": [{'Id': '1', 'MessageBody': b'third'},186 {'Id': '2', 'MessageBody': b''}]}187 self.assert_send_message_batch_calls_are([first_batch])188 self.assert_messages_buffer_empty()189 def test_auto_dedup_for_dup_requests_at_the_end_with_flush_in_between(self):190 with BatchSender(self.queue_url, self.client,191 flush_amount=2, overwrite_by_id=True) as b:192 b.send_message(Id='1', MessageBody=b'')193 b.send_message(Id='2', MessageBody=b'first')194 b.send_message(Id='2', MessageBody=b'second')195 b.send_message(Id='2', MessageBody=b'third')196 first_batch = {"QueueUrl": "queue_url",197 "Entries": [{'Id': '1', 'MessageBody': b''},198 {'Id': '2', 'MessageBody': b'first'}]}199 second_batch = {"QueueUrl": "queue_url",200 "Entries": [{'Id': '2', 'MessageBody': b'third'}]}201 self.assert_send_message_batch_calls_are([first_batch, second_batch])202 self.assert_messages_buffer_empty()203 def test_fails_on_missing_id(self):204 with self.assertRaises(MissingParametersError) as context:205 self.batch_sender.send_message()206 self.assertEqual(str(context.exception), "The following required parameters are missing for Message: Id")207 self.assert_messages_buffer_empty()208 def test_flush_aborts_on_repeated_failure(self):209 # We're going to simulate failed items210 # returning multiple failed items across calls.211 self.client.send_message_batch.side_effect = [212 {213 'Failed': [214 {"Id": "1", "SenderFault": False, "Code": "Code"}215 ],216 },217 {218 'Failed': [219 {"Id": "1", "SenderFault": False, "Code": "Code"}220 ],221 },222 {223 'Failed': [224 {"Id": "1", "SenderFault": False, "Code": "Code"}225 ],226 }227 ]228 with self.assertRaises(ClientError):229 self.batch_sender.send_message(Id='1', MessageBody=b'')230 self.batch_sender.flush()231 self.assert_messages_buffer_is([{"Id": '1', "MessageBody": b''}])232 def assert_send_message_batch_calls_are(self, expected_calls):233 self.assertEqual(self.client.send_message_batch.call_count,234 len(expected_calls))235 calls = [c[1] for c in self.client.send_message_batch.call_args_list]236 self.assertEqual(expected_calls, calls)237 def assert_messages_buffer_empty(self):238 self.assertEqual([], self.batch_sender._messages_buffer)239 def assert_messages_buffer_is(self, expected_messages_buffer):...

Full Screen

Full Screen

scan_initiator_test.py

Source:scan_initiator_test.py Github

copy

Full Screen

1import pytest2import os3import itertools4from unittest.mock import patch, MagicMock, call5from test_utils.test_utils import AsyncContextManagerMock, coroutine_of6from boto3.dynamodb.conditions import Key7from decimal import Decimal8TEST_ENV = {9 "REGION": "eu-west-wood",10 "STAGE": "door",11 "APP_NAME": "me-once",12 "USE_XRAY": "0"13}14@pytest.fixture15async def scan_initiator():16 with patch.dict(os.environ, TEST_ENV), \17 patch("aioboto3.client") as boto_client, \18 patch("aioboto3.resource") as boto_resource:19 # ensure each client is a different mock20 boto_client.side_effect = (MagicMock() for _ in itertools.count())21 boto_resource.side_effect = (MagicMock() for _ in itertools.count())22 from scan_initiator import scan_initiator23 yield scan_initiator24 scan_initiator.dynamo_resource.reset_mock()25 scan_initiator.sqs_client.reset_mock()26 await scan_initiator.clean_clients()27@patch.dict(os.environ, TEST_ENV)28def set_ssm_return_vals(ssm_client, period, buckets):29 stage = os.environ["STAGE"]30 app_name = os.environ["APP_NAME"]31 ssm_prefix = f"/{app_name}/{stage}"32 ssm_client.get_parameters.return_value = coroutine_of({33 "Parameters": [34 {"Name": f"{ssm_prefix}/scheduler/dynamodb/scans_planned/id", "Value": "MyTableId"},35 {"Name": f"{ssm_prefix}/scheduler/dynamodb/scans_planned/plan_index", "Value": "MyIndexName"},36 {"Name": f"{ssm_prefix}/scheduler/dynamodb/address_info/id", "Value": "MyIndexName"},37 {"Name": f"{ssm_prefix}/scheduler/config/period", "Value": str(period)},38 {"Name": f"{ssm_prefix}/scheduler/config/buckets", "Value": str(buckets)},39 {"Name": f"{ssm_prefix}/scheduler/scan_delay_queue", "Value": "MyDelayQueue"}40 ]41 })42def _mock_delete_responses(mock_plan_table, side_effects):43 mock_batch_writer = AsyncContextManagerMock()44 mock_plan_table.batch_writer.return_value = mock_batch_writer45 mock_batch_writer.aenter.delete_item.side_effect = side_effects46 return mock_batch_writer.aenter47@patch("time.time", return_value=1984)48@pytest.mark.unit49def test_paginates_scan_results(_, scan_initiator):50 # ssm params don"t matter much in this test51 set_ssm_return_vals(scan_initiator.ssm_client, 40, 10)52 # access mock for dynamodb table53 mock_info_table, mock_plan_table = _mock_resources(scan_initiator)54 # return a single result but with a last evaluated key present, second result wont have55 # that key56 mock_plan_table.scan.side_effect = iter([57 coroutine_of({58 "Items": [{59 "Address": "123.456.123.456",60 "DnsIngestTime": 12345,61 "PlannedScanTime": 6789062 }],63 "LastEvaluatedKey": "SomeKey"64 }),65 coroutine_of({66 "Items": [{67 "Address": "456.345.123.123",68 "DnsIngestTime": 123456,69 "PlannedScanTime": 6789070 }]71 }),72 ])73 mock_info_table.update_item.side_effect = iter([coroutine_of(None), coroutine_of(None)])74 # pretend the sqs messages are all successfully dispatched75 scan_initiator.sqs_client.send_message_batch.side_effect = [76 coroutine_of(None),77 coroutine_of(None)78 ]79 # pretend the delete item calls are all successful too80 writer = _mock_delete_responses(mock_plan_table, [coroutine_of(None), coroutine_of(None)])81 # actually do the test82 scan_initiator.initiate_scans({}, MagicMock())83 # check the scan happens twice, searching for planned scans earlier than 1984 + 40/10 i.e. now + bucket_length84 assert mock_plan_table.scan.call_args_list == [85 call(86 IndexName="MyIndexName",87 FilterExpression=Key("PlannedScanTime").lte(Decimal(1988))88 ),89 call(90 IndexName="MyIndexName",91 FilterExpression=Key("PlannedScanTime").lte(Decimal(1988)),92 ExclusiveStartKey="SomeKey"93 )94 ]95 # Doesn"t batch across pages96 assert scan_initiator.sqs_client.send_message_batch.call_count == 297 assert writer.delete_item.call_count == 298def _mock_resources(scan_initiator):99 mock_plan_table, mock_info_table = (MagicMock(), MagicMock())100 scan_initiator.dynamo_resource.Table.side_effect = iter([mock_plan_table, mock_info_table])101 scan_initiator.dynamo_resource.close.return_value = coroutine_of(None)102 scan_initiator.ssm_client.close.return_value = coroutine_of(None)103 scan_initiator.sqs_client.close.return_value = coroutine_of(None)104 return mock_info_table, mock_plan_table105@patch("time.time", return_value=1984)106@pytest.mark.unit107def test_replace_punctuation_in_address_ids(_, scan_initiator):108 # ssm params don"t matter much in this test109 set_ssm_return_vals(scan_initiator.ssm_client, 100, 4)110 # access mock for dynamodb table111 mock_info_table, mock_plan_table = _mock_resources(scan_initiator)112 # return a single result with ip4 and another with ip6113 mock_plan_table.scan.side_effect = iter([114 coroutine_of({115 "Items": [116 {117 "Address": "123.456.123.456",118 "DnsIngestTime": 12345,119 "PlannedScanTime": 67890120 },121 {122 "Address": "2001:0db8:85a3:0000:0000:8a2e:0370:7334",123 "DnsIngestTime": 12345,124 "PlannedScanTime": 67890125 }126 ]127 })128 ])129 mock_info_table.update_item.side_effect = iter([coroutine_of(None), coroutine_of(None)])130 # pretend the sqs and dynamo deletes are all ok131 scan_initiator.sqs_client.send_message_batch.side_effect = [coroutine_of(None)]132 _mock_delete_responses(mock_plan_table, [coroutine_of(None), coroutine_of(None)])133 # actually do the test134 scan_initiator.initiate_scans({}, MagicMock())135 # check both addresses have : and . replaced with -136 scan_initiator.sqs_client.send_message_batch.assert_called_once_with(137 QueueUrl="MyDelayQueue",138 Entries=[139 {140 "Id": "123-456-123-456",141 "DelaySeconds": 67890-1984, # planned scan time minus now time142 "MessageBody": "{\"AddressToScan\":\"123.456.123.456\"}"143 },144 {145 "Id": "2001-0db8-85a3-0000-0000-8a2e-0370-7334",146 "DelaySeconds": 67890-1984, # planned scan time minus now time147 "MessageBody": "{\"AddressToScan\":\"2001:0db8:85a3:0000:0000:8a2e:0370:7334\"}"148 }149 ]150 )151@patch("time.time", return_value=1984)152@pytest.mark.unit153def test_batches_sqs_writes(_, scan_initiator):154 # ssm params don"t matter much in this test155 set_ssm_return_vals(scan_initiator.ssm_client, 100, 4)156 # access mock for dynamodb table157 mock_info_table, mock_plan_table = _mock_resources(scan_initiator)158 # send 32 responses in a single scan result, will be batched into groups of 10 for159 # sqs160 mock_plan_table.scan.side_effect = iter([161 coroutine_of({162 "Items": [163 {164 "Address": f"123.456.123.{item_num}",165 "DnsIngestTime": 12345,166 "PlannedScanTime": 67890167 }168 for item_num in range(0, 32)169 ]170 })171 ])172 mock_info_table.update_item.side_effect = iter([coroutine_of(None) for _ in range(0, 32)])173 # pretend the sqs and dynamo deletes are all ok, there are 4 calls to sqs174 # and175 scan_initiator.sqs_client.send_message_batch.side_effect = [176 coroutine_of(None) for _ in range(0, 4)177 ]178 writer = _mock_delete_responses(mock_plan_table, [179 coroutine_of(None) for _ in range(0, 32)180 ])181 # actually do the test182 scan_initiator.initiate_scans({}, MagicMock())183 # There will be 4 calls to sqs184 assert scan_initiator.sqs_client.send_message_batch.call_count == 4185 # The last batch will have 2 remaining items in it N.B. a call object is a tuple of the186 # positional args and then the kwags187 assert len(scan_initiator.sqs_client.send_message_batch.call_args_list[3][1]["Entries"]) == 2188 # There will be individual deletes for each address i.e. 32 of them189 assert writer.delete_item.call_count == 32190@patch("time.time", return_value=1984)191@pytest.mark.unit192def test_no_deletes_until_all_sqs_success(_, scan_initiator):193 # ssm params don"t matter much in this test194 set_ssm_return_vals(scan_initiator.ssm_client, 100, 4)195 # access mock for dynamodb table196 mock_info_table, mock_plan_table = _mock_resources(scan_initiator)197 # send a single response198 mock_plan_table.scan.side_effect = [199 coroutine_of({200 "Items": [201 {202 "Address": f"123.456.123.5",203 "DnsIngestTime": 12345,204 "PlannedScanTime": 67890205 }206 ]207 })208 ]209 # pretend the sqs and dynamo deletes are all ok, there are 4 calls to sqs210 # and211 scan_initiator.sqs_client.send_message_batch.side_effect = [212 Exception("test error")213 ]214 writer = _mock_delete_responses(mock_plan_table, [])215 # actually do the test216 with pytest.raises(Exception):217 scan_initiator.initiate_scans({}, MagicMock())218 # There will be 1 call to sqs219 assert scan_initiator.sqs_client.send_message_batch.call_count == 1220 # and none to dynamo...

Full Screen

Full Screen

a_sqs.py

Source:a_sqs.py Github

copy

Full Screen

...4 def __init__(self, session, queue):5 self.session = session6 self.client = self.session.client("sqs")7 self.queue = queue8 def send_message_batch(self, batches):9 try:10 response = self.client.send_message_batch(11 QueueUrl=self.queue,12 Entries=batches13 )14 except botocore.exceptions.ClientError as e:15 print(e)16 half = len(batches)//217 a = batches[:half]18 b = batches[half:]19 response = []20 if len(a) > 0:21 response.append(self.client.send_message_batch(22 QueueUrl=self.queue,23 Entries=a24 ))25 if len(b) > 0:26 response.append(self.client.send_message_batch(27 QueueUrl=self.queue,28 Entries=b29 ))...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful