How to use transact_write_items method in localstack

Best Python code snippet using localstack_python

api.py

Source:api.py Github

copy

Full Screen

...53 )54 # In case service doesn't exist55 if service_resp["Responses"] == [{}]:56 raise ValidationException("Service doesn't exist")57 response = client.transact_write_items(58 TransactItems=[59 {60 "Update": {61 "TableName": lock_table_name,62 "Key": {"LockName": {"S": lock_name}},63 "UpdateExpression": "SET HeldBy = :held_by, Lock_Acquire_DateTime = :date_time, JobId = :job_id",64 "ExpressionAttributeValues": {65 ":held_by": {"S": service_name},66 ":empty_string": {"S": ""},67 ":date_time": {"S": date_time},68 ":job_id": {"S": job_id},69 },70 "ConditionExpression": "HeldBy = :empty_string AND JobId = :empty_string",71 }72 }73 ]74 )75 response["ResponseMetadata"]["JobId"] = job_id76 response["ResponseMetadata"][77 "Message"78 ] = "%s acquired the following lock: %s" % (service_name, lock_name)79 return response80 except ValidationException as e:81 logger.debug(str(e))82 return Response(83 body={"Message": str(e)},84 status_code=400,85 headers={"Content-Type": "application/json"},86 )87 except ClientError as e:88 logger.debug(str(e))89 return Response(90 body={"Message": "Unable to acquire a lock"},91 status_code=409,92 headers={"Content-Type": "application/json"},93 )94 except Exception as e:95 logger.debug(str(e))96 return Response(97 body={"Message": "Unexpected exception occurred during the request"},98 status_code=500,99 headers={"Content-Type": "application/json"},100 )101def retrieve_tables_values(lock_name, lock_table_name):102 """Retrieves values from the lock table."""103 client = SingletonDynamoDBClient.getInstance()104 response = client.transact_get_items(105 TransactItems=[106 {107 "Get": {108 "TableName": lock_table_name,109 "Key": {"LockName": {"S": lock_name}},110 }111 }112 ]113 )114 return response["Responses"]115def dictify_resp(list_of_dict_item):116 """Converts a list of DynamoDB response dictionaries into a simpler key-value dictionary."""117 ret_dict = {}118 for dict_item in list_of_dict_item:119 dict_item = dict_item.get("Item", {})120 for k, v in dict_item.items():121 for vk, vv in v.items():122 ret_dict[k] = vv123 return ret_dict124@lockapi.route("/release", methods=["POST"])125def release_lock():126 api_data = lockapi.current_request.json_body127 required_fields = [128 "LockName",129 "ServiceName",130 "JobId",131 "LockTableName",132 "LogTableName",133 ]134 check_fields(api_data, required_fields)135 service_name, lock_name, job_id, lock_table_name, log_table_name = (136 api_data["ServiceName"],137 api_data["LockName"],138 api_data["JobId"],139 api_data["LockTableName"],140 api_data["LogTableName"],141 )142 client = SingletonDynamoDBClient.getInstance()143 try:144 # Put relevant Row information into dict. This is for logging purposes145 read_resp = retrieve_tables_values(lock_name, lock_table_name)146 row_dict = dictify_resp(read_resp)147 date_time = str(datetime.utcnow())148 response = client.transact_write_items(149 TransactItems=[150 {151 "Put": {152 "TableName": log_table_name,153 "Item": {154 "LockName": {"S": lock_name},155 "ServiceName": {"S": service_name},156 "JobId": {"S": job_id},157 "Lock_Acquire_DateTime": {158 "S": row_dict.get("Lock_Acquire_DateTime", "")159 },160 "Lock_Release_DateTime": {"S": date_time},161 },162 }163 },164 {165 "Update": {166 "TableName": lock_table_name,167 "Key": {"LockName": {"S": lock_name}},168 "UpdateExpression": "SET HeldBy = :empty_string, JobId = :empty_string",169 "ExpressionAttributeValues": {170 ":held_by": {"S": service_name},171 ":empty_string": {"S": ""},172 ":job_id": {"S": job_id},173 },174 "ConditionExpression": "HeldBy = :held_by AND JobId = :job_id",175 }176 },177 ]178 )179 response["ResponseMetadata"]["Message"] = (180 "%s released the following lock: %s with JobId: %s"181 % (service_name, lock_name, job_id)182 )183 return response184 except ClientError as e:185 logger.debug(str(e))186 return Response(187 body={"Message": "Unable to release lock"},188 status_code=409,189 headers={"Content-Type": "application/json"},190 )191 except Exception as e:192 logger.debug(str(e))193 return Response(194 body={"Mesage": "Unexpected exception occurred during the request"},195 status_code=500,196 headers={"Content-Type": "application/json"},197 )198@lockapi.route("/register_lock", methods=["POST"])199def register_lock():200 api_data = lockapi.current_request.json_body201 required_fields = ["LockName", "LockTableName"]202 check_fields(api_data, required_fields)203 lock_name, lock_table_name = (api_data["LockName"], api_data["LockTableName"])204 client = SingletonDynamoDBClient.getInstance()205 try:206 date_time = str(datetime.utcnow())207 response = client.transact_write_items(208 TransactItems=[209 {210 "Put": {211 "TableName": lock_table_name,212 "Item": {213 "LockName": {"S": lock_name},214 "HeldBy": {"S": ""},215 "Lock_Acquire_DateTime": {"S": date_time},216 "JobId": {"S": ""},217 },218 "ConditionExpression": "attribute_not_exists(LockName)",219 }220 }221 ]222 )223 response["ResponseMetadata"]["Message"] = "Registered Lock: %s" % lock_name224 return response225 except ClientError as e:226 logger.debug(str(e))227 return Response(228 body={"Message": "Unable to register a lock"},229 status_code=409,230 headers={"Content-Type": "application/json"},231 )232 except Exception as e:233 logger.debug(str(e))234 return Response(235 body={"Message": "Unexpected exception occurred during the request"},236 status_code=500,237 headers={"Content-Type": "application/json"},238 )239@lockapi.route("/deregister_lock", methods=["POST"])240def deregister_lock():241 api_data = lockapi.current_request.json_body242 required_fields = ["LockName", "LockTableName"]243 check_fields(api_data, required_fields)244 lock_name, lock_table_name = (api_data["LockName"], api_data["LockTableName"])245 client = SingletonDynamoDBClient.getInstance()246 try:247 response = client.transact_write_items(248 TransactItems=[249 {250 "Delete": {251 "TableName": lock_table_name,252 "Key": {"LockName": {"S": lock_name}},253 "ExpressionAttributeValues": {":empty_string": {"S": ""}},254 "ConditionExpression": "HeldBy = :empty_string AND JobId = :empty_string",255 }256 }257 ]258 )259 response["ResponseMetadata"]["Message"] = "Deregistered Lock: %s" % lock_name260 return response261 except ClientError as e:262 logger.debug(str(e))263 return Response(264 body={"Message": "Unable to deregister a lock"},265 status_code=409,266 headers={"Content-Type": "application/json"},267 )268 except Exception as e:269 logger.debug(str(e))270 return Response(271 body={"Message": "Unexpected exception occurred during the request"},272 status_code=500,273 headers={"Content-Type": "application/json"},274 )275@lockapi.route("/register_service", methods=["POST"])276def register_service():277 api_data = lockapi.current_request.json_body278 required_fields = ["ServiceName", "ServiceTableName"]279 check_fields(api_data, required_fields)280 service_name, service_table_name = (281 api_data["ServiceName"],282 api_data["ServiceTableName"],283 )284 client = SingletonDynamoDBClient.getInstance()285 try:286 response = client.transact_write_items(287 TransactItems=[288 {289 "Put": {290 "TableName": service_table_name,291 "Item": {"ServiceName": {"S": service_name},},292 "ConditionExpression": "attribute_not_exists(ServiceName)",293 }294 }295 ]296 )297 response["ResponseMetadata"]["Message"] = (298 "Registered Service: %s" % service_name299 )300 return response301 except ClientError as e:302 logger.debug(str(e))303 return Response(304 body={"Message": "Unable to register a service"},305 status_code=409,306 headers={"Content-Type": "application/json"},307 )308 except Exception as e:309 logger.debug(str(e))310 return Response(311 body={"Message": "Unexpected exception occurred during the request"},312 status_code=500,313 headers={"Content-Type": "application/json"},314 )315@lockapi.route("/deregister_service", methods=["POST"])316def deregister_service():317 api_data = lockapi.current_request.json_body318 required_fields = ["ServiceName", "ServiceTableName"]319 check_fields(api_data, required_fields)320 service_name, service_table_name = (321 api_data["ServiceName"],322 api_data["ServiceTableName"],323 )324 client = SingletonDynamoDBClient.getInstance()325 try:326 response = client.transact_write_items(327 TransactItems=[328 {329 "Delete": {330 "TableName": service_table_name,331 "Key": {"ServiceName": {"S": service_name}},332 "ExpressionAttributeValues": {333 ":service_name": {"S": service_name}334 },335 "ConditionExpression": "ServiceName = :service_name",336 }337 }338 ]339 )340 response["ResponseMetadata"]["Message"] = (...

Full Screen

Full Screen

dml_with_transaction.py

Source:dml_with_transaction.py Github

copy

Full Screen

...31 ":val" : 8032 },33 ReturnValues = "UPDATED_NEW"34 )35def transact_write_items():36 # update both items together, and they would be failed37 try:38 dynamodb_client.transact_write_items(39 TransactItems=[40 {41 'Update': {42 'TableName': table_name,43 'Key': {44 'id': {'S': '1'}45 },46 'ConditionExpression': 'balance = :bc',47 'UpdateExpression': 'SET balance = :bu',48 'ExpressionAttributeValues': {49 ':bc': {'N': '100'}, # expected to be 10050 ':bu': {'N': '90'}, # 100 - 10 = 9051 }52 }53 },54 {55 'Update': {56 'TableName': table_name,57 'Key': {58 'id': {'S': '2'}59 },60 'ConditionExpression': 'balance = :bc',61 'UpdateExpression': 'SET balance = :bu',62 'ExpressionAttributeValues': {63 ':bc': {'N': '50'}, # expected to be 5064 ':bu': {'N': '60'}, # 50 + 10 = 6065 }66 }67 }68 ]69 )70 except Exception as e:71 print(f"type = {type(e)} , message = {e}")72 # repay73 response = dynamodb_client.transact_write_items(74 TransactItems=[75 {76 'Update': {77 'TableName': table_name,78 'Key': {79 'id': {'S': '1'}80 },81 'ConditionExpression': 'balance = :bc',82 'UpdateExpression': 'SET balance = :bu',83 'ExpressionAttributeValues': {84 ':bc': {'N': '80'}, # fix 8085 ':bu': {'N': '70'}, # 80 - 10 = 7086 }87 }88 },89 {90 'Update': {91 'TableName': table_name,92 'Key': {93 'id': {'S': '2'}94 },95 'ConditionExpression': 'balance = :bc',96 'UpdateExpression': 'SET balance = :bu',97 'ExpressionAttributeValues': {98 ':bc': {'N': '50'}, # not changed99 ':bu': {'N': '60'}, # 50 + 10 = 60100 }101 }102 }103 ]104 )105 print(response)106def transact_get_items():107 # check it out both items are updated108 response = dynamodb_client.transact_get_items(109 TransactItems=[110 {111 'Get': {112 'TableName': table_name,113 'Key': {114 'id': {'S': '1'}115 }116 }117 },118 {119 'Get': {120 'TableName': table_name,121 'Key': {122 'id': {'S': '2'}123 }124 }125 }126 ]127 )128 print(response)129if __name__ == "__main__":130 init()131 transact_write_items()...

Full Screen

Full Screen

test_transacts.py

Source:test_transacts.py Github

copy

Full Screen

1from unittest.mock import MagicMock2import dynamo_io as dio3from dynamo_io.tests import fixtures4def test_transacts():5 """Should call transact_write_items without error."""6 client = MagicMock()7 client.transact_write_items.return_value = {}8 dio.transacts(9 client=client,10 table_name="foo",11 puts=[12 fixtures.Foo(first_key="first:a", second_key="second:a"),13 fixtures.FooNoSort(first_key="first:b"),14 ],15 updates=[16 fixtures.Foo(first_key="first:c", second_key="second:c"),17 fixtures.FooNoSort(first_key="first:d"),18 ],19 deletes=[20 fixtures.Foo(first_key="first:e", second_key="second:e"),21 fixtures.FooNoSort(first_key="first:f"),22 ],23 )...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful