How to use batch_write_item method in localstack

Best Python code snippet using localstack_python

test_batch_write.py

Source:test_batch_write.py Github

copy

Full Screen

...31 item = self.build_x_item('S', 'forum1', 'subject2',32 ('message', 'S', 'message text'))33 request_body = {'request_items': {self.tname: [{'put_request':34 {'item': item}}]}}35 headers, body = self.client.batch_write_item(request_body)36 self.assertIn('unprocessed_items', body)37 self.assertEqual(body['unprocessed_items'], {})38 key_conditions = {39 self.hashkey: {40 'attribute_value_list': [{'S': 'forum1'}],41 'comparison_operator': 'EQ'42 }43 }44 headers, body = self.client.query(self.tname,45 key_conditions=key_conditions,46 consistent_read=True)47 self.assertEqual(item, body['items'][0])48 @attr(type=['BWI-2'])49 def test_batch_write_all_params(self):50 self._create_test_table(self.build_x_attrs('S'),51 self.tname,52 self.smoke_schema,53 wait_for_active=True)54 item = self.build_x_item('S', 'forum1', 'subject2',55 ('message', 'S', 'message text'))56 request_body = {'request_items': {self.tname: [{'put_request':57 {'item': item}}]}}58 headers, body = self.client.batch_write_item(request_body)59 self.assertIn('unprocessed_items', body)60 self.assertEqual(body['unprocessed_items'], {})61 key_conditions = {62 self.hashkey: {63 'attribute_value_list': [{'S': 'forum1'}],64 'comparison_operator': 'EQ'65 }66 }67 headers, body = self.client.query(self.tname,68 key_conditions=key_conditions,69 consistent_read=True)70 self.assertEqual(item, body['items'][0])71 @attr(type=['BWI-10'])72 def test_batch_write_put_n(self):73 self._create_test_table(self.build_x_attrs('N'),74 self.tname,75 self.smoke_schema,76 wait_for_active=True)77 item = self.build_x_item('N', '1000', '2000', ('message', 'N', '1001'))78 request_body = {'request_items': {self.tname: [{'put_request':79 {'item': item}}]}}80 headers, body = self.client.batch_write_item(request_body)81 key_conditions = {82 self.hashkey: {83 'attribute_value_list': [{'N': '1000'}],84 'comparison_operator': 'EQ'85 }86 }87 headers, body = self.client.query(self.tname,88 key_conditions=key_conditions,89 consistent_read=True)90 self.assertEqual(item, body['items'][0])91 @attr(type=['BWI-13'])92 def test_batch_write_put_n_ns(self):93 self._create_test_table(self.build_x_attrs('N'),94 self.tname,95 self.smoke_schema,96 wait_for_active=True)97 item = self.build_x_item('N', '1000', '2000',98 ('message', 'NS', ['1001', '1002']))99 request_body = {'request_items': {self.tname: [{'put_request':100 {'item': item}}]}}101 headers, body = self.client.batch_write_item(request_body)102 key_conditions = {103 self.hashkey: {104 'attribute_value_list': [{'N': '1000'}],105 'comparison_operator': 'EQ'106 }107 }108 headers, body = self.client.query(self.tname,109 key_conditions=key_conditions,110 consistent_read=True)111 self.assertEqual(item, body['items'][0])112 @attr(type=['BWI-36'])113 def test_batch_write_delete_s(self):114 self._create_test_table(self.smoke_attrs,115 self.tname,116 self.smoke_schema,117 wait_for_active=True)118 self.put_smoke_item(self.tname, 'forum1', 'subject2',119 'message text', 'John', '10')120 key_conditions = {121 self.hashkey: {122 'attribute_value_list': [{'S': 'forum1'}],123 'comparison_operator': 'EQ'124 },125 self.rangekey: {126 'attribute_value_list': [{'S': 'subject'}],127 'comparison_operator': 'BEGINS_WITH'128 }129 }130 headers, body = self.client.query(self.tname,131 key_conditions=key_conditions,132 consistent_read=True)133 if not body['count']:134 raise exceptions.TempestException("No item to delete.")135 key = {self.hashkey: {'S': 'forum1'}, self.rangekey: {'S': 'subject2'}}136 request_body = {'request_items': {self.tname: [{'delete_request':137 {'key': key}}]}}138 self.client.batch_write_item(request_body)139 headers, body = self.client.query(self.tname,140 key_conditions=key_conditions,141 consistent_read=True)142 self.assertEqual(0, body['count'])143 @attr(type=['BWI-53'])144 def test_batch_write_delete(self):145 self._create_test_table(self.smoke_attrs,146 self.tname,147 self.smoke_schema,148 wait_for_active=True)149 self.put_smoke_item(self.tname, 'forum1', 'subject2',150 'message text', 'John', '10')151 key_conditions = {152 self.hashkey: {153 'attribute_value_list': [{'S': 'forum1'}],154 'comparison_operator': 'EQ'155 },156 self.rangekey: {157 'attribute_value_list': [{'S': 'subject'}],158 'comparison_operator': 'BEGINS_WITH'159 }160 }161 headers, body = self.client.query(self.tname,162 key_conditions=key_conditions,163 consistent_read=True)164 if not body['count']:165 raise exceptions.TempestException("No item to delete.")166 key = {self.hashkey: {'S': 'forum1'}, self.rangekey: {'S': 'subject2'}}167 request_body = {'request_items': {self.tname: [{'delete_request':168 {'key': key}}]}}169 self.client.batch_write_item(request_body)170 headers, body = self.client.query(self.tname,171 key_conditions=key_conditions,172 consistent_read=True)173 self.assertEqual(0, body['count'])174 @attr(type=['BWI-40'])175 def test_batch_write_put_delete(self):176 self._create_test_table(self.smoke_attrs,177 self.tname,178 self.smoke_schema,179 wait_for_active=True)180 self.put_smoke_item(self.tname, 'forum1', 'subject2',181 'message text', 'John', '10')182 key_conditions = {183 self.hashkey: {184 'attribute_value_list': [{'S': 'forum1'}],185 'comparison_operator': 'EQ'186 },187 self.rangekey: {188 'attribute_value_list': [{'S': 'subject'}],189 'comparison_operator': 'BEGINS_WITH'190 }191 }192 headers, body = self.client.query(self.tname,193 key_conditions=key_conditions,194 consistent_read=True)195 if not body['count']:196 raise exceptions.TempestException("No item to delete.")197 item = self.build_smoke_item('forum1', 'subject3',198 'message text', 'John', '10')199 key = {self.hashkey: {'S': 'forum1'}, self.rangekey: {'S': 'subject2'}}200 request_body = {'request_items': {self.tname: [{'delete_request':201 {'key': key}},202 {'put_request':203 {'item': item}}]}}204 self.client.batch_write_item(request_body)205 headers, body = self.client.query(self.tname,206 key_conditions=key_conditions,207 consistent_read=True)208 self.assertEqual(1, body['count'])209 self.assertIn(item, body['items'])210 @attr(type=['BWI-51'])211 def test_batch_write_put_25_tables(self):212 tnames = [rand_name().replace('-', '') for i in range(0, 25)]213 for tname in tnames:214 self._create_test_table(self.smoke_attrs,215 tname,216 self.smoke_schema,217 wait_for_active=True)218 item = self.build_smoke_item('forum1', 'subject3',219 'message text', 'John', '10')220 request_body = {'request_items': dict(221 (tname, [{'put_request': {'item': item}}]) for tname in tnames)}222 key_conditions = {223 self.hashkey: {224 'attribute_value_list': [{'S': 'forum1'}],225 'comparison_operator': 'EQ'226 },227 self.rangekey: {228 'attribute_value_list': [{'S': 'subject'}],229 'comparison_operator': 'BEGINS_WITH'230 }231 }232 self.client.batch_write_item(request_body)233 for tname in tnames:234 headers, body = self.client.query(tname,235 key_conditions=key_conditions,236 consistent_read=True)237 self.assertEqual(1, body['count'])238 self.assertIn(item, body['items'])239 @attr(type=['BWI-16'])240 def test_batch_write_put_s(self):241 self._create_test_table(self.build_x_attrs('S'),242 self.tname,243 self.smoke_schema,244 wait_for_active=True)245 item = self.build_x_item('S', 'forum1', 'subject2',246 ('message', 'S', 'message text'))247 request_body = {'request_items': {self.tname: [{'put_request':248 {'item': item}}]}}249 self.client.batch_write_item(request_body)250 key_conditions = {251 self.hashkey: {252 'attribute_value_list': [{'S': 'forum1'}],253 'comparison_operator': 'EQ'254 }255 }256 headers, body = self.client.query(self.tname,257 key_conditions=key_conditions,258 consistent_read=True)259 self.assertEqual(item, body['items'][0])260 @attr(type=['BWI-14'])261 def test_batch_write_put_b(self):262 self._create_test_table(self.build_x_attrs('S'),263 self.tname,264 self.smoke_schema,265 wait_for_active=True)266 value = base64.b64encode('\xFF')267 item = self.build_x_item('S', 'forum1', 'subject2',268 ('message', 'B', value))269 request_body = {'request_items': {self.tname: [{'put_request':270 {'item': item}}]}}271 self.client.batch_write_item(request_body)272 key_conditions = {273 self.hashkey: {274 'attribute_value_list': [{'S': 'forum1'}],275 'comparison_operator': 'EQ'276 }277 }278 headers, body = self.client.query(self.tname,279 key_conditions=key_conditions,280 consistent_read=True)281 self.assertEqual(item, body['items'][0])282 @attr(type=['BWI-15'])283 def test_batch_write_put_bs(self):284 self._create_test_table(self.build_x_attrs('S'),285 self.tname,286 self.smoke_schema,287 wait_for_active=True)288 value1 = base64.b64encode('\xFF\x01')289 value2 = base64.b64encode('\xFF\x02')290 item = self.build_x_item('S', 'forum1', 'subject2',291 ('message', 'BS', [value1, value2]))292 request_body = {'request_items': {self.tname: [{'put_request':293 {'item': item}}]}}294 self.client.batch_write_item(request_body)295 key_conditions = {296 self.hashkey: {297 'attribute_value_list': [{'S': 'forum1'}],298 'comparison_operator': 'EQ'299 }300 }301 headers, body = self.client.query(self.tname,302 key_conditions=key_conditions,303 consistent_read=True)304 self.assertEqual(item, body['items'][0])305 @attr(type=['BWI-50'])306 def test_batch_write_correct_tname_and_put_request(self):307 self._create_test_table(self.build_x_attrs('S'),308 self.tname,309 self.smoke_schema,310 wait_for_active=True)311 item = self.build_x_item('S', 'forum1', 'subject2')312 request_body = {'request_items': {self.tname: [{'put_request':313 {'item': item}}]}}314 self.client.batch_write_item(request_body)315 key_conditions = {316 self.hashkey: {317 'attribute_value_list': [{'S': 'forum1'}],318 'comparison_operator': 'EQ'319 }320 }321 headers, body = self.client.query(self.tname,322 key_conditions=key_conditions,323 consistent_read=True)324 self.assertEqual(item, body['items'][0])325 @attr(type=['BWI-42', 'negative'])326 def test_batch_write_put_delete_same_item(self):327 self._create_test_table(self.smoke_attrs, self.tname,328 self.smoke_schema,329 wait_for_active=True)330 item = self.build_smoke_item('forum1', 'subject2',331 'message text', 'John', '10')332 key = {333 self.hashkey: {'S': 'forum1'},334 self.rangekey: {'S': 'subject2'}335 }336 request_body = {337 'request_items': {self.tname: [338 {'put_request': {'item': item}},339 {'delete_request': {'key': key}}340 ]}341 }342 with self.assertRaises(exceptions.BadRequest) as raises_cm:343 self.client.batch_write_item(request_body)344 exception = raises_cm.exception345 self.assertIn("ValidationError", exception._error_string)346 self.assertIn("More than one", exception._error_string)347 @attr(type=['BWI-54_3'])348 def test_batch_write_non_existent_table(self):349 tname = 'non_existent_table'350 item = self.build_x_item('S', 'forum1', 'subject2',351 ('message', 'S', 'message text'))352 request_body = {'request_items': {tname: [{'put_request':353 {'item': item}}]}}354 with self.assertRaises(exceptions.NotFound) as raises_cm:355 self.client.batch_write_item(request_body)356 error_msg = raises_cm.exception._error_string357 self.assertIn("Not Found", error_msg)358 self.assertIn("Table 'non_existent_table' does not exist", error_msg)359 @attr(type=['BWI-12'])360 def test_batch_write_put_empty_attr_name(self):361 self._create_test_table(self.build_x_attrs('S'),362 self.tname,363 self.smoke_schema,364 wait_for_active=True)365 item = self.build_x_item('S', 'forum1', 'subject2',366 ('', 'N', '1000'))367 request_body = {'request_items': {self.tname: [{'put_request':368 {'item': item}}]}}369 with self.assertRaises(exceptions.BadRequest) as raises_cm:370 self.client.batch_write_item(request_body)371 error_msg = raises_cm.exception._error_string372 self.assertIn("Bad Request", error_msg)...

Full Screen

Full Screen

test_index.py

Source:test_index.py Github

copy

Full Screen

1# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.2# SPDX-License-Identifier: Apache-2.03import os4from unittest import TestCase5from unittest.mock import patch, MagicMock6import urllib.parse7with patch("boto3.resource") as boto_resource_mock:8 from functions.usergamedata.DeleteBundle import index9BUNDLES_TABLE_NAME = 'test_bundles_table'10ITEMS_TABLE_NAME = 'test_bundleitems_table'11# Patch Lambda environment variables:12@patch.dict(os.environ, {13 'BUNDLES_TABLE_NAME': BUNDLES_TABLE_NAME,14 'BUNDLE_ITEMS_TABLE_NAME': ITEMS_TABLE_NAME15})16class TestDeleteBundle(TestCase):17 def setUp(self):18 index.ddb_resource = MagicMock()19 def test_delete_bundle_item_key_missing_returns_400_error(self):20 # Arrange21 event = self.get_lambda_event()22 event['queryStringParameters'] = {'payload': urllib.parse.quote('{}')}23 # Act24 result = index.lambda_handler(event, None)25 # Assert26 self.assertEqual(400, result['statusCode'])27 index.ddb_resource.batch_write_item.assert_not_called()28 def test_delete_bundle_path_parameters_is_empty_returns_400_error(self):29 # Arrange30 event = self.get_lambda_event()31 event['pathParameters'] = {}32 # Act33 result = index.lambda_handler(event, None)34 # Assert35 self.assertEqual(400, result['statusCode'])36 index.ddb_resource.batch_write_item.assert_not_called()37 def test_delete_bundle_bundle_name_empty_returns_400_error(self):38 # Arrange39 event = self.get_lambda_event()40 event['pathParameters'] = {'bundle_name': None}41 # Act42 result = index.lambda_handler(event, None)43 # Assert44 self.assertEqual(400, result['statusCode'])45 index.ddb_resource.batch_write_item.assert_not_called()46 def test_delete_bundle_bundle_name_too_long_returns_400_error(self):47 # Arrange48 event = self.get_lambda_event()49 event['pathParameters'] = {'bundle_name': 'x' * 256}50 # Act51 result = index.lambda_handler(event, None)52 # Assert53 self.assertEqual(400, result['statusCode'])54 index.ddb_resource.batch_write_item.assert_not_called()55 def test_delete_bundle_payload_too_long_returns_400_error(self):56 # Arrange57 event = self.get_lambda_event()58 event['queryStringParameters'] = {'payload': urllib.parse.quote('x' * 1025)}59 # Act60 result = index.lambda_handler(event, None)61 # Assert62 self.assertEqual(400, result['statusCode'])63 index.ddb_resource.batch_write_item.assert_not_called()64 def test_delete_bundle_invalid_player_returns_401_error(self):65 # Arrange66 event = self.get_lambda_event()67 event['requestContext'] = {'authorizer': {'claims': {}}}68 # Act69 result = index.lambda_handler(event, None)70 # Assert71 self.assertEqual(401, result['statusCode'])72 index.ddb_resource.batch_write_item.assert_not_called()73 def test_delete_bundle_key_passed_in_query_string_returns_204_success(self):74 # Arrange75 event = self.get_lambda_event()76 event['queryStringParameters'] = {'payload': urllib.parse.quote('{"bundle_item_keys": ["SCORE1","SCORE2"]}')}77 # Act78 result = index.lambda_handler(event, None)79 # Assert80 self.assertEqual(204, result['statusCode'])81 index.ddb_resource.batch_write_item.assert_called_once_with(82 RequestItems={'test_bundleitems_table': [83 {'DeleteRequest':84 {'Key': {'player_id_bundle': '12345678-1234-1234-1234-123456789012_BANANA_BUNDLE', 'bundle_item_key': 'SCORE1'}}},85 {'DeleteRequest':86 {'Key': {'player_id_bundle': '12345678-1234-1234-1234-123456789012_BANANA_BUNDLE', 'bundle_item_key': 'SCORE2'}}}]})87 def test_delete_bundle_no_params_passed_returns_success(self):88 # Arrange89 event = self.get_lambda_event()90 event['queryStringParameters'] = None91 index.ddb_resource.Table().query.side_effect = [{'Items': [{'player_id_bundle': '12345_TestBushel', 'bundle_item_key': 'TestBanana'}]}, {'Items': []}]92 # Act93 result = index.lambda_handler(event, None)94 # Assert95 self.assertEqual(204, result['statusCode'])96 index.ddb_resource.batch_write_item.assert_called_once_with(97 RequestItems={'test_bundleitems_table': [98 {'DeleteRequest': {'Key': {'player_id_bundle': '12345_TestBushel', 'bundle_item_key': 'TestBanana'}}}]})99 index.ddb_resource.Table().delete_item.assert_called()100 @staticmethod101 def get_lambda_event():102 return {103 'resource': '/usergamedata/{bundle_name}',104 'path': '/usergamedata/BANANA_BUNDLE',105 'httpMethod': 'DELETE',106 'headers': {107 'Accept': '*/*',108 'Accept-Encoding': 'gzip, deflate, br',109 'Content-Type': 'application/json',110 'Host': 'abcdefghij.execute-api.us-west-2.amazonaws.com',111 'User-Agent': 'TestAgent',112 'X-Amzn-Trace-Id': 'Root=1-61003a02-7e1356b05a1e1569614c0c46',113 'X-Forwarded-For': '127.0.0.1',114 'X-Forwarded-Port': '443',115 'X-Forwarded-Proto': 'https'116 },117 'multiValueHeaders': {118 'Accept': ['*/*'],119 'Accept-Encoding': ['gzip, deflate, br'],120 'Content-Type': ['application/json'],121 'Host': ['abcdefghij.execute-api.us-west-2.amazonaws.com'],122 'User-Agent': ['TestAgent'],123 'X-Amzn-Trace-Id': ['Root=1-61003a02-7e1356b05a1e1569614c0c46'],124 'X-Forwarded-For': ['127.0.0.1'],125 'X-Forwarded-Port': ['443'],126 'X-Forwarded-Proto': ['https']127 },128 'queryStringParameters': None,129 'multiValueQueryStringParameters': None,130 'pathParameters': {131 'bundle_name': 'BANANA_BUNDLE'132 },133 'stageVariables': None,134 'requestContext': {135 'resourceId': 'abcdef',136 'authorizer': {137 'claims': {138 'sub': '12345678-1234-1234-1234-123456789012',139 'iss': 'https://cognito-idp.us-west-2.amazonaws.com/us-west-2_123456789',140 'cognito:username': 'jakschic',141 'origin_jti': '12345678-1234-1234-1234-123456789012',142 'aud': '7s24tlabcn8n0defbfoghijsgn',143 'event_id': '6234d920-b637-4cdf-bd44-3a5e53f51569',144 'token_use': 'id',145 'auth_time': '1627438909',146 'custom:gk_user_id': '12345678-1234-1234-1234-123456789012',147 'exp': 'Wed Jul 28 03:21:49 UTC 2021',148 'iat': 'Wed Jul 28 02:21:49 UTC 2021',149 'jti': '7s24tlabcn8n0defbfoghijsgn',150 'email': 'xyz@abc.def'151 }152 },153 'domainName': 'abcdefghij.execute-api.us-west-2.amazonaws.com',154 'apiId': 'abcdefghij'155 },156 'body': None,157 'isBase64Encoded': False...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful