How to use dynamodb_resource method in localstack

Best Python code snippet using localstack_python

dynamodb.py

Source:dynamodb.py Github

copy

Full Screen

1import json2import queue3import ast4import decimal5import threading6import datetime7from debsaws.common import base_operating_premise8from boto3.dynamodb.conditions import Key, Attr9region, instanceid, basedir, session = base_operating_premise()10ec2_client = session.client('ec2')11startTime = datetime.datetime.now()12nowish = datetime.date.today()13dynamodb_client = session.client("dynamodb")14dynamodb_resource = session.resource('dynamodb')15def list_tables(account):16 tables = dynamodb_client.list_tables()17 return tables18def describe_table(table):19 response = dynamodb_client.describe_table(20 TableName=table )21 return response22def get_table_metadata(account, table_name):23 table = dynamodb_resource.Table(table_name)24 return {25 'table_name': table.table_name,26 'table_arn': table.table_arn,27 'num_items': table.item_count,28 'created': table.creation_date_time,29 'table_throughput': table.provisioned_throughput,30 'primary_key_name': table.key_schema[0],31 'status': table.table_status,32 'bytes_size': table.table_size_bytes,33 'global_secondary_indices': table.global_secondary_indexes,34 'table_attributes': table.attribute_definitions,35 }36class DecimalEncoder(json.JSONEncoder):37 def default(self, o):38 if isinstance(o, decimal.Decimal):39 if o % 1 > 0:40 return float(o)41 else:42 return int(o)43 return super(DecimalEncoder, self).default(o)44def query_all_rows(account, table_name):45 table = dynamodb_resource.Table(table_name)46 response = table.scan()47 data = response[ 'Items' ]48 while 'LastEvaluatedKey' in response:49 response = table.scan(ExclusiveStartKey=response[ 'LastEvaluatedKey' ])50 data.extend(response[ 'Items' ])51 return data52def query_some_rows(account, table_name):53 table = dynamodb_resource.Table(table_name)54 response = table.scan()55 data = response[ 'Items' ]56 rows = []57 for i in data:58 d = ast.literal_eval((json.dumps(i,cls=DecimalEncoder)))59 rows.append(d)60 return rows61def delete_item(table, Item):62 dynamodb_table = dynamodb_resource.Table(table)63 response = dynamodb_table.delete_item(Key = Item )64 return response65def query_item(table, Item):66 dynamodb_table = dynamodb_resource.Table(table)67 response = dynamodb_table.get_item(Key = Item )68 return response69def scan_hub_items(table, hubuuid):70 dynamodb_table = dynamodb_resource.Table(table)71 response = dynamodb_table.scan( FilterExpression=Attr('pk').eq(hubuuid) &72 Attr('sk').between(1560294000, 1560340800 ) )73 data = response[ 'Items' ]74 while 'LastEvaluatedKey' in response:75 response = dynamodb_table.scan(FilterExpression=Attr('pk').eq(hubuuid) &76 Attr('sk').between(1560294000, 1560340800),77 ExclusiveStartKey=response[ 'LastEvaluatedKey' ])78 data.extend(response[ 'Items' ])79 rows = [ ]80 for i in data:81 d = ast.literal_eval((json.dumps(i,cls=DecimalEncoder)))82 rows.append(d)83 return response84def scan_table_lite(table, filterexpression, limit):85 dynamodb_table = dynamodb_resource.Table(table)86 # print(f"FilterExpression {filterexpression}")87 # print(f"Table {table}")88 response = dynamodb_table.scan( FilterExpression=eval(filterexpression),89 Limit = limit)90 return response91def scan_order_table(table, filterexpression, limit):92 dynamodb_table = dynamodb_resource.Table(table)93 # print(f"FilterExpression {filterexpression}")94 response = dynamodb_table.scan( FilterExpression=eval(filterexpression),95 ScanIndexForward=False,96 Limit = limit)97 return response98def scan_table(table, filterexpression):99 dynamodb_table = dynamodb_resource.Table(table)100 print(f"FilterExpression {filterexpression}")101 response = dynamodb_table.scan( FilterExpression=eval(filterexpression))102 data = response[ 'Items' ]103 while 'LastEvaluatedKey' in response:104 response = dynamodb_table.scan(FilterExpression=eval(filterexpression),105 ExclusiveStartKey=response[ 'LastEvaluatedKey' ])106 data.extend(response[ 'Items' ])107 # rows = [ ]108 # for i in data:109 # d = ast.literal_eval((json.dumps(i,cls=DecimalEncoder)))110 # rows.append(d)111 return data112def item_exists(dest_table, uuid, new_store_start_ts, new_store_end_ts):113 dynamodb_table = dynamodb_resource.Table(dest_table)114 response = dynamodb_table.query( KeyConditionExpression=Key('pk').eq(uuid) &115 Key('sk').between(new_store_start_ts, new_store_end_ts ),116 ProjectionExpression= "pk, sk" )117 data = response[ 'Items' ]118 if len(data) > 0 :119 exists = 'Y'120 else:121 exists = 'N'122 return exists123def query_hub_items_between(table, uuid, date1, date2):124 dynamodb_table = dynamodb_resource.Table(table)125 # print (f"UUID: {uuid}, SK:{date1} - {date2}, TABLE: {table}")126 response = dynamodb_table.query( KeyConditionExpression=Key('pk').eq(uuid) &127 Key('sk').between(date1, date2 ) )128 data = response[ 'Items' ]129 while 'LastEvaluatedKey' in response:130 response = dynamodb_table.query( KeyConditionExpression=Key('pk').eq(uuid) &131 Key('sk').between(date1, date2),132 ExclusiveStartKey=response[ 'LastEvaluatedKey' ])133 data.extend(response[ 'Items' ])134 return data135def _batch(iterable, n=1):136 l = len(iterable)137 for ndx in range(0, l, n):138 yield iterable[ndx:min(ndx + n, l)]139def insert_item(table, Item):140 dynamodb_table = dynamodb_resource.Table(table)141 response = dynamodb_table.put_item(Item=Item)142 return response143def put_dynamodb(items, table):144 batches = _batch(items, 25)145 for batch in batches:146 # print(batch)147 response = batch_put(batch,table)148 if response is not True: print(response)149 return response150def put_dynamodb_threaded(table, items):151 batches = _batch(items, 25)152 threads_to_start =50153 my_queue = queue.Queue()154 def worker():155 while not my_queue.empty():156 data = my_queue.get()157 batch_put(table, data)158 my_queue.task_done()159 # print(f"Q: {my_queue.qsize()}")160 for batch in batches:161 my_queue.put(batch)162 print(f"Start queue length: {my_queue.qsize()}")163 for i in range(threads_to_start):164 t = threading.Thread(target=worker,165 daemon=True) # daemon means that all threads will exit when the main thread exits166 t.start()167 my_queue.join()168def batch_put(table, Items):169 filepath = f"{basedir}/fails.txt"170 dynamodb_table = dynamodb_resource.Table(table)171 with dynamodb_table.batch_writer() as batch:172 # print(type(Items))173 for item in Items:174 # print(item)175 try:176 x = batch.put_item(Item=item)177 except:178 try:179 insert_item(table, item)180 except Exception as r:181 err = f"EXCEPTION {r} Cant put {item}"182 print(err)183 with open(filepath,'a') as outfile:184 outfile.write(f"{item}\n {r}\n")185 return len(Items)186def batch_delete(table,Items):187 dynamodb_table = dynamodb_resource.Table(table)188 with dynamodb_table.batch_writer() as batch:189 for item in Items:190 batch.delete_item(item)191def update_items(table,item):192 dynamodb_table = dynamodb_resource.Table(table)193 update = dynamodb_table.put_item(Item=item)194 return update195def delete_item(table, Item):196 dynamodb_table = dynamodb_resource.Table(table)197 response = dynamodb_table.delete_item(Key = Item )198 return response199# def batch_get_items_by_pk(table, pkname, listofitems):200# # dynamodb_table = dynamodb_client.Table(table, pkname)201# response = dynamodb_client.batch_get_item(202# RequestItems={203# table :204# {'Keys': [{pkname: {'SS': [id for id in listofitems]}}]}205# })...

Full Screen

Full Screen

run.py

Source:run.py Github

copy

Full Screen

1import inspect2import os3import time4from pprint import pprint5from dotenv import load_dotenv6import yaml7import boto38from boto3.dynamodb.conditions import Key, Attr9import pandas as pd10load_dotenv()11def log_func(func):12 def wrapper(*args, **kwargs):13 func_args = inspect.signature(func).bind(*args, **kwargs).arguments14 func_args_str = ", ".join(map("{0[0]} = {0[1]!r}".format, func_args.items()))15 print(f"\n{'-'*40}")16 print(f"[Func]\t{func.__qualname__}")17 print(f"[Args]:\t{func_args_str}")18 return func(*args, **kwargs)19 return wrapper20def time_func(func):21 def timeit_wrapper(*args, **kwargs):22 start_time = time.perf_counter()23 result = func(*args, **kwargs)24 end_time = time.perf_counter()25 total_time = end_time - start_time26 print(f"\nFunction {func.__name__}{args} Took {total_time:.4f} seconds")27 return result28 return timeit_wrapper29@log_func30def list_tables(dynamodb_resource: boto3.resource):31 tables = dynamodb_resource.tables.all()32 for table in tables:33 print(table)34@log_func35def get_table_schema(dynamodb_resource: boto3.resource, table_name: str):36 try:37 dynamodb_table = dynamodb_resource.Table(table_name)38 except Exception as e:39 raise e40 attrs = dynamodb_table.attribute_definitions41 pprint(attrs)42 schema = dynamodb_table.key_schema43 pprint(schema)44@log_func45def create_table(dynamodb_resource: boto3.resource, params: dict):46 try:47 table = dynamodb_resource.create_table(**params)48 table.wait_until_exists()49 print(f"Table ({params['TableName']}) created.")50 except Exception as e:51 raise e52@log_func53def delete_table(dynamodb_resource: boto3.resource, table_name: str):54 try:55 table = dynamodb_resource.Table(table_name)56 table.delete()57 table.wait_until_not_exists()58 print(f"Table ({table_name}) deleted.")59 except Exception as e:60 raise e61@log_func62def put_item(dynamodb_resource: boto3.resource, table_name: str, items: dict):63 try:64 dynamodb_table = dynamodb_resource.Table(table_name)65 except Exception as e:66 raise e67 with dynamodb_table.batch_writer() as batch:68 for counter, item in enumerate(items):69 batch.put_item(Item=item)70 print(f"Items put: {counter+1}")71@log_func72def update_item(dynamodb_resource: boto3.resource, table_name: str, option: dict):73 try:74 dynamodb_table = dynamodb_resource.Table(table_name)75 except Exception as e:76 raise e77 dynamodb_table.update_item(**option)78@log_func79def truncate_table(dynamodb_resource: boto3.resource, table_name: str):80 try:81 table = dynamodb_resource.Table(table_name)82 except Exception as e:83 raise e84 table_key_name = [key.get("AttributeName") for key in table.key_schema]85 projectionExpression = ", ".join("#" + key for key in table_key_name)86 expressionAttrNames = {"#" + key: key for key in table_key_name}87 counter = 088 page = table.scan(89 ProjectionExpression=projectionExpression,90 ExpressionAttributeNames=expressionAttrNames,91 )92 with table.batch_writer() as batch:93 while page["Count"] > 0:94 counter += page["Count"]95 for itemKeys in page["Items"]:96 batch.delete_item(Key=itemKeys)97 if "LastEvaluatedKey" in page:98 page = table.scan(99 ProjectionExpression=projectionExpression,100 ExpressionAttributeNames=expressionAttrNames,101 ExclusiveStartKey=page["LastEvaluatedKey"],102 )103 else:104 break105 print(f"Items deleted: {counter}")106@log_func107def scan_table(dynamodb_resource: boto3.resource, table_name: str):108 try:109 table = dynamodb_resource.Table(table_name)110 except Exception as e:111 raise e112 response = table.scan()113 data = response["Items"]114 while "LastEvaluatedKey" in response:115 response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])116 data.extend(response["Items"])117 print(data)118 return data119@log_func120def query_table(121 dynamodb_resource: boto3.resource,122 table_name: str,123 options: dict,124):125 table = dynamodb_resource.Table(table_name)126 query_res = table.query(**options)127 data = query_res["Items"]128 print(data)129@log_func130def copy_table(131 src_dynamodb: boto3.client,132 src_table_name: str,133 dst_dynamodb: boto3.client,134 dst_table_name: str,135):136 dynamo_paginator = src_dynamodb.get_paginator("scan")137 dynamodb_response = dynamo_paginator.paginate(138 TableName=src_table_name,139 Select="ALL_ATTRIBUTES",140 ReturnConsumedCapacity="NONE",141 ConsistentRead=True,142 )143 for page in dynamodb_response:144 if page["Items"]:145 for count, item in enumerate(page["Items"]):146 dst_dynamodb.put_item(TableName=dst_table_name, Item=item)147 print(f"Items transfered: {count+1}")148 else:149 print("Original Table is empty.")150if __name__ == "__main__":151 dynamodb = boto3.resource(152 "dynamodb",153 endpoint_url="http://localhost:8100",154 region_name=os.environ.get("REGION_NAME"),155 aws_access_key_id=os.environ.get("AWS_ACCESS_KEY"),156 aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),157 )158 with open("env.yaml", "r", encoding="utf-8") as f:159 params = yaml.safe_load(f)["DynamoDB"]160 with open("item.yaml", "r", encoding="utf-8") as f:161 items = yaml.safe_load(f)["TestItem"]162 get_table_schema(dynamodb, "TestTable")163 list_tables(dynamodb)164 delete_table(dynamodb, "TestTable")165 create_table(dynamodb, params)166 put_item(dynamodb, "TestTable", items)167 scan_table(dynamodb, "TestTable")168 truncate_table(dynamodb, "TestTable")169 scan_table(dynamodb, "TestTable")170 update_option = {171 "Key": {"user_id": "test_user_2", "count": 100},172 "UpdateExpression": "set #attr1 = :_status",173 "ExpressionAttributeNames": {"#attr1": "status"},174 "ExpressionAttributeValues": {":_status": 404},175 "ReturnValues": "UPDATED_NEW",176 }177 update_item(178 dynamodb_resource=dynamodb, table_name="TestTable", option=update_option179 )180 query_options = {181 # "Select": "COUNT",182 "KeyConditionExpression": Key("user_id").eq("test_user_3")183 & Key("count").eq(40),184 # "FilterExpression": Attr("status").eq(1),185 }186 query_table(187 dynamodb_resource=dynamodb, table_name="TestTable", options=query_options188 )189 src_dynamodb_client = boto3.client(190 "dynamodb",191 endpoint_url="http://localhost:8100",192 region_name=os.environ.get("REGION_NAME"),193 aws_access_key_id=os.environ.get("AWS_ACCESS_KEY"),194 aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),195 )196 dst_dynamodb_client = boto3.client(197 "dynamodb",198 endpoint_url="http://localhost:8100",199 region_name=os.environ.get("REGION_NAME"),200 aws_access_key_id=os.environ.get("AWS_ACCESS_KEY"),201 aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),202 )...

Full Screen

Full Screen

util.py

Source:util.py Github

copy

Full Screen

...38 aws_access_key_id=aws_access_key_id,39 aws_secret_access_key=aws_secret_access_key,40 aws_session_token=aws_session_token41 )42def setup_dynamodb_resource(aws_access_key_id, aws_secret_access_key, aws_session_token, region_name='us-east-1'):43 global dynamodb_resource44 dynamodb_resource = boto3.resource(45 'dynamodb',46 region_name=region_name,47 aws_access_key_id=aws_access_key_id,48 aws_secret_access_key=aws_secret_access_key,49 aws_session_token=aws_session_token50 )51def delete_object(bucket_name, object_name):52 global s3_client53 if not s3_client:54 return None55 try:56 s3_client.delete_object(...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful