Best Python code snippet using localstack_python
test_batch_write.py
Source:test_batch_write.py  
...31        item = self.build_x_item('S', 'forum1', 'subject2',32                                 ('message', 'S', 'message text'))33        request_body = {'request_items': {self.tname: [{'put_request':34                                                        {'item': item}}]}}35        headers, body = self.client.batch_write_item(request_body)36        self.assertIn('unprocessed_items', body)37        self.assertEqual(body['unprocessed_items'], {})38        key_conditions = {39            self.hashkey: {40                'attribute_value_list': [{'S': 'forum1'}],41                'comparison_operator': 'EQ'42            }43        }44        headers, body = self.client.query(self.tname,45                                          key_conditions=key_conditions,46                                          consistent_read=True)47        self.assertEqual(item, body['items'][0])48    @attr(type=['BWI-2'])49    def test_batch_write_all_params(self):50        self._create_test_table(self.build_x_attrs('S'),51                                self.tname,52                                self.smoke_schema,53                                wait_for_active=True)54        item = self.build_x_item('S', 'forum1', 'subject2',55                                 ('message', 'S', 'message text'))56        request_body = {'request_items': {self.tname: [{'put_request':57                                                        {'item': item}}]}}58        headers, body = self.client.batch_write_item(request_body)59        self.assertIn('unprocessed_items', body)60        self.assertEqual(body['unprocessed_items'], {})61        key_conditions = {62            self.hashkey: {63                'attribute_value_list': [{'S': 'forum1'}],64                'comparison_operator': 'EQ'65            }66        }67        headers, body = self.client.query(self.tname,68                                          key_conditions=key_conditions,69                                          consistent_read=True)70        self.assertEqual(item, body['items'][0])71    @attr(type=['BWI-10'])72    def test_batch_write_put_n(self):73        self._create_test_table(self.build_x_attrs('N'),74                                self.tname,75                                self.smoke_schema,76                                wait_for_active=True)77        item = self.build_x_item('N', '1000', '2000', ('message', 'N', '1001'))78        request_body = {'request_items': {self.tname: [{'put_request':79                                                        {'item': item}}]}}80        headers, body = self.client.batch_write_item(request_body)81        key_conditions = {82            self.hashkey: {83                'attribute_value_list': [{'N': '1000'}],84                'comparison_operator': 'EQ'85            }86        }87        headers, body = self.client.query(self.tname,88                                          key_conditions=key_conditions,89                                          consistent_read=True)90        self.assertEqual(item, body['items'][0])91    @attr(type=['BWI-13'])92    def test_batch_write_put_n_ns(self):93        self._create_test_table(self.build_x_attrs('N'),94                                self.tname,95                                self.smoke_schema,96                                wait_for_active=True)97        item = self.build_x_item('N', '1000', '2000',98                                 ('message', 'NS', ['1001', '1002']))99        request_body = {'request_items': {self.tname: [{'put_request':100                                                        {'item': item}}]}}101        headers, body = self.client.batch_write_item(request_body)102        key_conditions = {103            self.hashkey: {104                'attribute_value_list': [{'N': '1000'}],105                'comparison_operator': 'EQ'106            }107        }108        headers, body = self.client.query(self.tname,109                                          key_conditions=key_conditions,110                                          consistent_read=True)111        self.assertEqual(item, body['items'][0])112    @attr(type=['BWI-36'])113    def test_batch_write_delete_s(self):114        self._create_test_table(self.smoke_attrs,115                                self.tname,116                                self.smoke_schema,117                                wait_for_active=True)118        self.put_smoke_item(self.tname, 'forum1', 'subject2',119                            'message text', 'John', '10')120        key_conditions = {121            self.hashkey: {122                'attribute_value_list': [{'S': 'forum1'}],123                'comparison_operator': 'EQ'124            },125            self.rangekey: {126                'attribute_value_list': [{'S': 'subject'}],127                'comparison_operator': 'BEGINS_WITH'128            }129        }130        headers, body = self.client.query(self.tname,131                                          key_conditions=key_conditions,132                                          consistent_read=True)133        if not body['count']:134            raise exceptions.TempestException("No item to delete.")135        key = {self.hashkey: {'S': 'forum1'}, self.rangekey: {'S': 'subject2'}}136        request_body = {'request_items': {self.tname: [{'delete_request':137                                                        {'key': key}}]}}138        self.client.batch_write_item(request_body)139        headers, body = self.client.query(self.tname,140                                          key_conditions=key_conditions,141                                          consistent_read=True)142        self.assertEqual(0, body['count'])143    @attr(type=['BWI-53'])144    def test_batch_write_delete(self):145        self._create_test_table(self.smoke_attrs,146                                self.tname,147                                self.smoke_schema,148                                wait_for_active=True)149        self.put_smoke_item(self.tname, 'forum1', 'subject2',150                            'message text', 'John', '10')151        key_conditions = {152            self.hashkey: {153                'attribute_value_list': [{'S': 'forum1'}],154                'comparison_operator': 'EQ'155            },156            self.rangekey: {157                'attribute_value_list': [{'S': 'subject'}],158                'comparison_operator': 'BEGINS_WITH'159            }160        }161        headers, body = self.client.query(self.tname,162                                          key_conditions=key_conditions,163                                          consistent_read=True)164        if not body['count']:165            raise exceptions.TempestException("No item to delete.")166        key = {self.hashkey: {'S': 'forum1'}, self.rangekey: {'S': 'subject2'}}167        request_body = {'request_items': {self.tname: [{'delete_request':168                                                        {'key': key}}]}}169        self.client.batch_write_item(request_body)170        headers, body = self.client.query(self.tname,171                                          key_conditions=key_conditions,172                                          consistent_read=True)173        self.assertEqual(0, body['count'])174    @attr(type=['BWI-40'])175    def test_batch_write_put_delete(self):176        self._create_test_table(self.smoke_attrs,177                                self.tname,178                                self.smoke_schema,179                                wait_for_active=True)180        self.put_smoke_item(self.tname, 'forum1', 'subject2',181                            'message text', 'John', '10')182        key_conditions = {183            self.hashkey: {184                'attribute_value_list': [{'S': 'forum1'}],185                'comparison_operator': 'EQ'186            },187            self.rangekey: {188                'attribute_value_list': [{'S': 'subject'}],189                'comparison_operator': 'BEGINS_WITH'190            }191        }192        headers, body = self.client.query(self.tname,193                                          key_conditions=key_conditions,194                                          consistent_read=True)195        if not body['count']:196            raise exceptions.TempestException("No item to delete.")197        item = self.build_smoke_item('forum1', 'subject3',198                                     'message text', 'John', '10')199        key = {self.hashkey: {'S': 'forum1'}, self.rangekey: {'S': 'subject2'}}200        request_body = {'request_items': {self.tname: [{'delete_request':201                                                        {'key': key}},202                                                       {'put_request':203                                                        {'item': item}}]}}204        self.client.batch_write_item(request_body)205        headers, body = self.client.query(self.tname,206                                          key_conditions=key_conditions,207                                          consistent_read=True)208        self.assertEqual(1, body['count'])209        self.assertIn(item, body['items'])210    @attr(type=['BWI-51'])211    def test_batch_write_put_25_tables(self):212        tnames = [rand_name().replace('-', '') for i in range(0, 25)]213        for tname in tnames:214            self._create_test_table(self.smoke_attrs,215                                    tname,216                                    self.smoke_schema,217                                    wait_for_active=True)218        item = self.build_smoke_item('forum1', 'subject3',219                                     'message text', 'John', '10')220        request_body = {'request_items': dict(221            (tname, [{'put_request': {'item': item}}]) for tname in tnames)}222        key_conditions = {223            self.hashkey: {224                'attribute_value_list': [{'S': 'forum1'}],225                'comparison_operator': 'EQ'226            },227            self.rangekey: {228                'attribute_value_list': [{'S': 'subject'}],229                'comparison_operator': 'BEGINS_WITH'230            }231        }232        self.client.batch_write_item(request_body)233        for tname in tnames:234            headers, body = self.client.query(tname,235                                              key_conditions=key_conditions,236                                              consistent_read=True)237            self.assertEqual(1, body['count'])238            self.assertIn(item, body['items'])239    @attr(type=['BWI-16'])240    def test_batch_write_put_s(self):241        self._create_test_table(self.build_x_attrs('S'),242                                self.tname,243                                self.smoke_schema,244                                wait_for_active=True)245        item = self.build_x_item('S', 'forum1', 'subject2',246                                 ('message', 'S', 'message text'))247        request_body = {'request_items': {self.tname: [{'put_request':248                                                        {'item': item}}]}}249        self.client.batch_write_item(request_body)250        key_conditions = {251            self.hashkey: {252                'attribute_value_list': [{'S': 'forum1'}],253                'comparison_operator': 'EQ'254            }255        }256        headers, body = self.client.query(self.tname,257                                          key_conditions=key_conditions,258                                          consistent_read=True)259        self.assertEqual(item, body['items'][0])260    @attr(type=['BWI-14'])261    def test_batch_write_put_b(self):262        self._create_test_table(self.build_x_attrs('S'),263                                self.tname,264                                self.smoke_schema,265                                wait_for_active=True)266        value = base64.b64encode('\xFF')267        item = self.build_x_item('S', 'forum1', 'subject2',268                                 ('message', 'B', value))269        request_body = {'request_items': {self.tname: [{'put_request':270                                                        {'item': item}}]}}271        self.client.batch_write_item(request_body)272        key_conditions = {273            self.hashkey: {274                'attribute_value_list': [{'S': 'forum1'}],275                'comparison_operator': 'EQ'276            }277        }278        headers, body = self.client.query(self.tname,279                                          key_conditions=key_conditions,280                                          consistent_read=True)281        self.assertEqual(item, body['items'][0])282    @attr(type=['BWI-15'])283    def test_batch_write_put_bs(self):284        self._create_test_table(self.build_x_attrs('S'),285                                self.tname,286                                self.smoke_schema,287                                wait_for_active=True)288        value1 = base64.b64encode('\xFF\x01')289        value2 = base64.b64encode('\xFF\x02')290        item = self.build_x_item('S', 'forum1', 'subject2',291                                 ('message', 'BS', [value1, value2]))292        request_body = {'request_items': {self.tname: [{'put_request':293                                                        {'item': item}}]}}294        self.client.batch_write_item(request_body)295        key_conditions = {296            self.hashkey: {297                'attribute_value_list': [{'S': 'forum1'}],298                'comparison_operator': 'EQ'299            }300        }301        headers, body = self.client.query(self.tname,302                                          key_conditions=key_conditions,303                                          consistent_read=True)304        self.assertEqual(item, body['items'][0])305    @attr(type=['BWI-50'])306    def test_batch_write_correct_tname_and_put_request(self):307        self._create_test_table(self.build_x_attrs('S'),308                                self.tname,309                                self.smoke_schema,310                                wait_for_active=True)311        item = self.build_x_item('S', 'forum1', 'subject2')312        request_body = {'request_items': {self.tname: [{'put_request':313                                                        {'item': item}}]}}314        self.client.batch_write_item(request_body)315        key_conditions = {316            self.hashkey: {317                'attribute_value_list': [{'S': 'forum1'}],318                'comparison_operator': 'EQ'319            }320        }321        headers, body = self.client.query(self.tname,322                                          key_conditions=key_conditions,323                                          consistent_read=True)324        self.assertEqual(item, body['items'][0])325    @attr(type=['BWI-42', 'negative'])326    def test_batch_write_put_delete_same_item(self):327        self._create_test_table(self.smoke_attrs, self.tname,328                                self.smoke_schema,329                                wait_for_active=True)330        item = self.build_smoke_item('forum1', 'subject2',331                                     'message text', 'John', '10')332        key = {333            self.hashkey: {'S': 'forum1'},334            self.rangekey: {'S': 'subject2'}335        }336        request_body = {337            'request_items': {self.tname: [338                {'put_request': {'item': item}},339                {'delete_request': {'key': key}}340            ]}341        }342        with self.assertRaises(exceptions.BadRequest) as raises_cm:343            self.client.batch_write_item(request_body)344        exception = raises_cm.exception345        self.assertIn("ValidationError", exception._error_string)346        self.assertIn("More than one", exception._error_string)347    @attr(type=['BWI-54_3'])348    def test_batch_write_non_existent_table(self):349        tname = 'non_existent_table'350        item = self.build_x_item('S', 'forum1', 'subject2',351                                 ('message', 'S', 'message text'))352        request_body = {'request_items': {tname: [{'put_request':353                                                   {'item': item}}]}}354        with self.assertRaises(exceptions.NotFound) as raises_cm:355            self.client.batch_write_item(request_body)356        error_msg = raises_cm.exception._error_string357        self.assertIn("Not Found", error_msg)358        self.assertIn("Table 'non_existent_table' does not exist", error_msg)359    @attr(type=['BWI-12'])360    def test_batch_write_put_empty_attr_name(self):361        self._create_test_table(self.build_x_attrs('S'),362                                self.tname,363                                self.smoke_schema,364                                wait_for_active=True)365        item = self.build_x_item('S', 'forum1', 'subject2',366                                 ('', 'N', '1000'))367        request_body = {'request_items': {self.tname: [{'put_request':368                                                        {'item': item}}]}}369        with self.assertRaises(exceptions.BadRequest) as raises_cm:370            self.client.batch_write_item(request_body)371        error_msg = raises_cm.exception._error_string372        self.assertIn("Bad Request", error_msg)...test_index.py
Source:test_index.py  
1# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.2# SPDX-License-Identifier: Apache-2.03import os4from unittest import TestCase5from unittest.mock import patch, MagicMock6import urllib.parse7with patch("boto3.resource") as boto_resource_mock:8    from functions.usergamedata.DeleteBundle import index9BUNDLES_TABLE_NAME = 'test_bundles_table'10ITEMS_TABLE_NAME = 'test_bundleitems_table'11# Patch Lambda environment variables:12@patch.dict(os.environ, {13    'BUNDLES_TABLE_NAME': BUNDLES_TABLE_NAME,14    'BUNDLE_ITEMS_TABLE_NAME': ITEMS_TABLE_NAME15})16class TestDeleteBundle(TestCase):17    def setUp(self):18        index.ddb_resource = MagicMock()19    def test_delete_bundle_item_key_missing_returns_400_error(self):20        # Arrange21        event = self.get_lambda_event()22        event['queryStringParameters'] = {'payload': urllib.parse.quote('{}')}23        # Act24        result = index.lambda_handler(event, None)25        # Assert26        self.assertEqual(400, result['statusCode'])27        index.ddb_resource.batch_write_item.assert_not_called()28    def test_delete_bundle_path_parameters_is_empty_returns_400_error(self):29        # Arrange30        event = self.get_lambda_event()31        event['pathParameters'] = {}32        # Act33        result = index.lambda_handler(event, None)34        # Assert35        self.assertEqual(400, result['statusCode'])36        index.ddb_resource.batch_write_item.assert_not_called()37    def test_delete_bundle_bundle_name_empty_returns_400_error(self):38        # Arrange39        event = self.get_lambda_event()40        event['pathParameters'] = {'bundle_name': None}41        # Act42        result = index.lambda_handler(event, None)43        # Assert44        self.assertEqual(400, result['statusCode'])45        index.ddb_resource.batch_write_item.assert_not_called()46    def test_delete_bundle_bundle_name_too_long_returns_400_error(self):47        # Arrange48        event = self.get_lambda_event()49        event['pathParameters'] = {'bundle_name': 'x' * 256}50        # Act51        result = index.lambda_handler(event, None)52        # Assert53        self.assertEqual(400, result['statusCode'])54        index.ddb_resource.batch_write_item.assert_not_called()55    def test_delete_bundle_payload_too_long_returns_400_error(self):56        # Arrange57        event = self.get_lambda_event()58        event['queryStringParameters'] = {'payload': urllib.parse.quote('x' * 1025)}59        # Act60        result = index.lambda_handler(event, None)61        # Assert62        self.assertEqual(400, result['statusCode'])63        index.ddb_resource.batch_write_item.assert_not_called()64    def test_delete_bundle_invalid_player_returns_401_error(self):65        # Arrange66        event = self.get_lambda_event()67        event['requestContext'] = {'authorizer': {'claims': {}}}68        # Act69        result = index.lambda_handler(event, None)70        # Assert71        self.assertEqual(401, result['statusCode'])72        index.ddb_resource.batch_write_item.assert_not_called()73    def test_delete_bundle_key_passed_in_query_string_returns_204_success(self):74        # Arrange75        event = self.get_lambda_event()76        event['queryStringParameters'] = {'payload': urllib.parse.quote('{"bundle_item_keys": ["SCORE1","SCORE2"]}')}77        # Act78        result = index.lambda_handler(event, None)79        # Assert80        self.assertEqual(204, result['statusCode'])81        index.ddb_resource.batch_write_item.assert_called_once_with(82            RequestItems={'test_bundleitems_table': [83                {'DeleteRequest':84                     {'Key': {'player_id_bundle': '12345678-1234-1234-1234-123456789012_BANANA_BUNDLE', 'bundle_item_key': 'SCORE1'}}},85                {'DeleteRequest':86                     {'Key': {'player_id_bundle': '12345678-1234-1234-1234-123456789012_BANANA_BUNDLE', 'bundle_item_key': 'SCORE2'}}}]})87    def test_delete_bundle_no_params_passed_returns_success(self):88        # Arrange89        event = self.get_lambda_event()90        event['queryStringParameters'] = None91        index.ddb_resource.Table().query.side_effect = [{'Items': [{'player_id_bundle': '12345_TestBushel', 'bundle_item_key': 'TestBanana'}]}, {'Items': []}]92        # Act93        result = index.lambda_handler(event, None)94        # Assert95        self.assertEqual(204, result['statusCode'])96        index.ddb_resource.batch_write_item.assert_called_once_with(97            RequestItems={'test_bundleitems_table': [98                {'DeleteRequest': {'Key': {'player_id_bundle': '12345_TestBushel', 'bundle_item_key': 'TestBanana'}}}]})99        index.ddb_resource.Table().delete_item.assert_called()100    @staticmethod101    def get_lambda_event():102        return {103            'resource': '/usergamedata/{bundle_name}',104            'path': '/usergamedata/BANANA_BUNDLE',105            'httpMethod': 'DELETE',106            'headers': {107                'Accept': '*/*',108                'Accept-Encoding': 'gzip, deflate, br',109                'Content-Type': 'application/json',110                'Host': 'abcdefghij.execute-api.us-west-2.amazonaws.com',111                'User-Agent': 'TestAgent',112                'X-Amzn-Trace-Id': 'Root=1-61003a02-7e1356b05a1e1569614c0c46',113                'X-Forwarded-For': '127.0.0.1',114                'X-Forwarded-Port': '443',115                'X-Forwarded-Proto': 'https'116            },117            'multiValueHeaders': {118                'Accept': ['*/*'],119                'Accept-Encoding': ['gzip, deflate, br'],120                'Content-Type': ['application/json'],121                'Host': ['abcdefghij.execute-api.us-west-2.amazonaws.com'],122                'User-Agent': ['TestAgent'],123                'X-Amzn-Trace-Id': ['Root=1-61003a02-7e1356b05a1e1569614c0c46'],124                'X-Forwarded-For': ['127.0.0.1'],125                'X-Forwarded-Port': ['443'],126                'X-Forwarded-Proto': ['https']127            },128            'queryStringParameters': None,129            'multiValueQueryStringParameters': None,130            'pathParameters': {131                'bundle_name': 'BANANA_BUNDLE'132            },133            'stageVariables': None,134            'requestContext': {135                'resourceId': 'abcdef',136                'authorizer': {137                    'claims': {138                        'sub': '12345678-1234-1234-1234-123456789012',139                        'iss': 'https://cognito-idp.us-west-2.amazonaws.com/us-west-2_123456789',140                        'cognito:username': 'jakschic',141                        'origin_jti': '12345678-1234-1234-1234-123456789012',142                        'aud': '7s24tlabcn8n0defbfoghijsgn',143                        'event_id': '6234d920-b637-4cdf-bd44-3a5e53f51569',144                        'token_use': 'id',145                        'auth_time': '1627438909',146                        'custom:gk_user_id': '12345678-1234-1234-1234-123456789012',147                        'exp': 'Wed Jul 28 03:21:49 UTC 2021',148                        'iat': 'Wed Jul 28 02:21:49 UTC 2021',149                        'jti': '7s24tlabcn8n0defbfoghijsgn',150                        'email': 'xyz@abc.def'151                    }152                },153                'domainName': 'abcdefghij.execute-api.us-west-2.amazonaws.com',154                'apiId': 'abcdefghij'155            },156            'body': None,157            'isBase64Encoded': False...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
