Best Python code snippet using localstack_python
dynamodb.py
Source:dynamodb.py  
1import json2import queue3import ast4import decimal5import threading6import datetime7from debsaws.common import base_operating_premise8from boto3.dynamodb.conditions import Key, Attr9region, instanceid, basedir, session = base_operating_premise()10ec2_client = session.client('ec2')11startTime = datetime.datetime.now()12nowish = datetime.date.today()13dynamodb_client = session.client("dynamodb")14dynamodb_resource = session.resource('dynamodb')15def list_tables(account):16    tables = dynamodb_client.list_tables()17    return tables18def describe_table(table):19    response = dynamodb_client.describe_table(20        TableName=table    )21    return response22def get_table_metadata(account, table_name):23    table = dynamodb_resource.Table(table_name)24    return {25        'table_name': table.table_name,26        'table_arn': table.table_arn,27        'num_items': table.item_count,28        'created': table.creation_date_time,29        'table_throughput': table.provisioned_throughput,30        'primary_key_name': table.key_schema[0],31        'status': table.table_status,32        'bytes_size': table.table_size_bytes,33        'global_secondary_indices': table.global_secondary_indexes,34        'table_attributes': table.attribute_definitions,35    }36class DecimalEncoder(json.JSONEncoder):37    def default(self, o):38        if isinstance(o, decimal.Decimal):39            if o % 1 > 0:40                return float(o)41            else:42                return int(o)43        return super(DecimalEncoder, self).default(o)44def query_all_rows(account, table_name):45    table = dynamodb_resource.Table(table_name)46    response = table.scan()47    data = response[ 'Items' ]48    while 'LastEvaluatedKey' in response:49        response = table.scan(ExclusiveStartKey=response[ 'LastEvaluatedKey' ])50        data.extend(response[ 'Items' ])51    return data52def query_some_rows(account, table_name):53    table = dynamodb_resource.Table(table_name)54    response = table.scan()55    data = response[ 'Items' ]56    rows = []57    for i in data:58        d = ast.literal_eval((json.dumps(i,cls=DecimalEncoder)))59        rows.append(d)60    return rows61def delete_item(table, Item):62    dynamodb_table = dynamodb_resource.Table(table)63    response = dynamodb_table.delete_item(Key = Item )64    return response65def query_item(table, Item):66    dynamodb_table = dynamodb_resource.Table(table)67    response = dynamodb_table.get_item(Key = Item )68    return response69def scan_hub_items(table, hubuuid):70    dynamodb_table = dynamodb_resource.Table(table)71    response = dynamodb_table.scan( FilterExpression=Attr('pk').eq(hubuuid) &72                                       Attr('sk').between(1560294000, 1560340800 ) )73    data = response[ 'Items' ]74    while 'LastEvaluatedKey' in response:75        response = dynamodb_table.scan(FilterExpression=Attr('pk').eq(hubuuid) &76                                       Attr('sk').between(1560294000, 1560340800),77                                       ExclusiveStartKey=response[ 'LastEvaluatedKey' ])78        data.extend(response[ 'Items' ])79    rows = [ ]80    for i in data:81        d = ast.literal_eval((json.dumps(i,cls=DecimalEncoder)))82        rows.append(d)83    return response84def scan_table_lite(table, filterexpression, limit):85    dynamodb_table = dynamodb_resource.Table(table)86    # print(f"FilterExpression {filterexpression}")87    # print(f"Table {table}")88    response = dynamodb_table.scan( FilterExpression=eval(filterexpression),89                                    Limit = limit)90    return response91def scan_order_table(table, filterexpression, limit):92    dynamodb_table = dynamodb_resource.Table(table)93    # print(f"FilterExpression {filterexpression}")94    response = dynamodb_table.scan( FilterExpression=eval(filterexpression),95                                    ScanIndexForward=False,96                                    Limit = limit)97    return response98def scan_table(table, filterexpression):99    dynamodb_table = dynamodb_resource.Table(table)100    print(f"FilterExpression {filterexpression}")101    response = dynamodb_table.scan( FilterExpression=eval(filterexpression))102    data = response[ 'Items' ]103    while 'LastEvaluatedKey' in response:104        response = dynamodb_table.scan(FilterExpression=eval(filterexpression),105                                       ExclusiveStartKey=response[ 'LastEvaluatedKey' ])106        data.extend(response[ 'Items' ])107    # rows = [ ]108    # for i in data:109    #     d = ast.literal_eval((json.dumps(i,cls=DecimalEncoder)))110    #     rows.append(d)111    return data112def item_exists(dest_table, uuid, new_store_start_ts, new_store_end_ts):113    dynamodb_table = dynamodb_resource.Table(dest_table)114    response = dynamodb_table.query( KeyConditionExpression=Key('pk').eq(uuid) &115                                       Key('sk').between(new_store_start_ts, new_store_end_ts ),116                                     ProjectionExpression= "pk, sk" )117    data = response[ 'Items' ]118    if len(data) > 0 :119        exists = 'Y'120    else:121        exists = 'N'122    return exists123def query_hub_items_between(table, uuid, date1, date2):124    dynamodb_table = dynamodb_resource.Table(table)125    # print (f"UUID: {uuid}, SK:{date1} - {date2}, TABLE: {table}")126    response = dynamodb_table.query( KeyConditionExpression=Key('pk').eq(uuid) &127                                       Key('sk').between(date1, date2 ) )128    data = response[ 'Items' ]129    while 'LastEvaluatedKey' in response:130        response = dynamodb_table.query( KeyConditionExpression=Key('pk').eq(uuid) &131                                       Key('sk').between(date1, date2),132                                       ExclusiveStartKey=response[ 'LastEvaluatedKey' ])133        data.extend(response[ 'Items' ])134    return data135def _batch(iterable, n=1):136    l = len(iterable)137    for ndx in range(0, l, n):138        yield iterable[ndx:min(ndx + n, l)]139def insert_item(table, Item):140    dynamodb_table = dynamodb_resource.Table(table)141    response = dynamodb_table.put_item(Item=Item)142    return response143def put_dynamodb(items, table):144    batches = _batch(items, 25)145    for batch in batches:146        # print(batch)147        response = batch_put(batch,table)148        if response is not True: print(response)149    return response150def put_dynamodb_threaded(table, items):151    batches = _batch(items, 25)152    threads_to_start =50153    my_queue = queue.Queue()154    def worker():155        while not my_queue.empty():156            data = my_queue.get()157            batch_put(table, data)158            my_queue.task_done()159            # print(f"Q: {my_queue.qsize()}")160    for batch in batches:161        my_queue.put(batch)162    print(f"Start queue length: {my_queue.qsize()}")163    for i in range(threads_to_start):164        t = threading.Thread(target=worker,165                             daemon=True)  # daemon means that all threads will exit when the main thread exits166        t.start()167    my_queue.join()168def batch_put(table, Items):169    filepath = f"{basedir}/fails.txt"170    dynamodb_table = dynamodb_resource.Table(table)171    with dynamodb_table.batch_writer() as batch:172        # print(type(Items))173        for item in Items:174            # print(item)175            try:176                x = batch.put_item(Item=item)177            except:178                try:179                    insert_item(table, item)180                except Exception as r:181                    err = f"EXCEPTION {r} Cant put {item}"182                    print(err)183                    with open(filepath,'a') as outfile:184                        outfile.write(f"{item}\n {r}\n")185    return len(Items)186def batch_delete(table,Items):187    dynamodb_table = dynamodb_resource.Table(table)188    with dynamodb_table.batch_writer() as batch:189        for item in Items:190            batch.delete_item(item)191def update_items(table,item):192    dynamodb_table = dynamodb_resource.Table(table)193    update = dynamodb_table.put_item(Item=item)194    return update195def delete_item(table, Item):196    dynamodb_table = dynamodb_resource.Table(table)197    response = dynamodb_table.delete_item(Key = Item )198    return response199# def batch_get_items_by_pk(table, pkname, listofitems):200#     # dynamodb_table = dynamodb_client.Table(table, pkname)201#     response = dynamodb_client.batch_get_item(202#                     RequestItems={203#                         table :204#                             {'Keys': [{pkname: {'SS': [id for id in listofitems]}}]}205#                     })...run.py
Source:run.py  
1import inspect2import os3import time4from pprint import pprint5from dotenv import load_dotenv6import yaml7import boto38from boto3.dynamodb.conditions import Key, Attr9import pandas as pd10load_dotenv()11def log_func(func):12    def wrapper(*args, **kwargs):13        func_args = inspect.signature(func).bind(*args, **kwargs).arguments14        func_args_str = ", ".join(map("{0[0]} = {0[1]!r}".format, func_args.items()))15        print(f"\n{'-'*40}")16        print(f"[Func]\t{func.__qualname__}")17        print(f"[Args]:\t{func_args_str}")18        return func(*args, **kwargs)19    return wrapper20def time_func(func):21    def timeit_wrapper(*args, **kwargs):22        start_time = time.perf_counter()23        result = func(*args, **kwargs)24        end_time = time.perf_counter()25        total_time = end_time - start_time26        print(f"\nFunction {func.__name__}{args} Took {total_time:.4f} seconds")27        return result28    return timeit_wrapper29@log_func30def list_tables(dynamodb_resource: boto3.resource):31    tables = dynamodb_resource.tables.all()32    for table in tables:33        print(table)34@log_func35def get_table_schema(dynamodb_resource: boto3.resource, table_name: str):36    try:37        dynamodb_table = dynamodb_resource.Table(table_name)38    except Exception as e:39        raise e40    attrs = dynamodb_table.attribute_definitions41    pprint(attrs)42    schema = dynamodb_table.key_schema43    pprint(schema)44@log_func45def create_table(dynamodb_resource: boto3.resource, params: dict):46    try:47        table = dynamodb_resource.create_table(**params)48        table.wait_until_exists()49        print(f"Table ({params['TableName']}) created.")50    except Exception as e:51        raise e52@log_func53def delete_table(dynamodb_resource: boto3.resource, table_name: str):54    try:55        table = dynamodb_resource.Table(table_name)56        table.delete()57        table.wait_until_not_exists()58        print(f"Table ({table_name}) deleted.")59    except Exception as e:60        raise e61@log_func62def put_item(dynamodb_resource: boto3.resource, table_name: str, items: dict):63    try:64        dynamodb_table = dynamodb_resource.Table(table_name)65    except Exception as e:66        raise e67    with dynamodb_table.batch_writer() as batch:68        for counter, item in enumerate(items):69            batch.put_item(Item=item)70    print(f"Items put: {counter+1}")71@log_func72def update_item(dynamodb_resource: boto3.resource, table_name: str, option: dict):73    try:74        dynamodb_table = dynamodb_resource.Table(table_name)75    except Exception as e:76        raise e77    dynamodb_table.update_item(**option)78@log_func79def truncate_table(dynamodb_resource: boto3.resource, table_name: str):80    try:81        table = dynamodb_resource.Table(table_name)82    except Exception as e:83        raise e84    table_key_name = [key.get("AttributeName") for key in table.key_schema]85    projectionExpression = ", ".join("#" + key for key in table_key_name)86    expressionAttrNames = {"#" + key: key for key in table_key_name}87    counter = 088    page = table.scan(89        ProjectionExpression=projectionExpression,90        ExpressionAttributeNames=expressionAttrNames,91    )92    with table.batch_writer() as batch:93        while page["Count"] > 0:94            counter += page["Count"]95            for itemKeys in page["Items"]:96                batch.delete_item(Key=itemKeys)97            if "LastEvaluatedKey" in page:98                page = table.scan(99                    ProjectionExpression=projectionExpression,100                    ExpressionAttributeNames=expressionAttrNames,101                    ExclusiveStartKey=page["LastEvaluatedKey"],102                )103            else:104                break105    print(f"Items deleted: {counter}")106@log_func107def scan_table(dynamodb_resource: boto3.resource, table_name: str):108    try:109        table = dynamodb_resource.Table(table_name)110    except Exception as e:111        raise e112    response = table.scan()113    data = response["Items"]114    while "LastEvaluatedKey" in response:115        response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])116        data.extend(response["Items"])117    print(data)118    return data119@log_func120def query_table(121    dynamodb_resource: boto3.resource,122    table_name: str,123    options: dict,124):125    table = dynamodb_resource.Table(table_name)126    query_res = table.query(**options)127    data = query_res["Items"]128    print(data)129@log_func130def copy_table(131    src_dynamodb: boto3.client,132    src_table_name: str,133    dst_dynamodb: boto3.client,134    dst_table_name: str,135):136    dynamo_paginator = src_dynamodb.get_paginator("scan")137    dynamodb_response = dynamo_paginator.paginate(138        TableName=src_table_name,139        Select="ALL_ATTRIBUTES",140        ReturnConsumedCapacity="NONE",141        ConsistentRead=True,142    )143    for page in dynamodb_response:144        if page["Items"]:145            for count, item in enumerate(page["Items"]):146                dst_dynamodb.put_item(TableName=dst_table_name, Item=item)147            print(f"Items transfered: {count+1}")148        else:149            print("Original Table is empty.")150if __name__ == "__main__":151    dynamodb = boto3.resource(152        "dynamodb",153        endpoint_url="http://localhost:8100",154        region_name=os.environ.get("REGION_NAME"),155        aws_access_key_id=os.environ.get("AWS_ACCESS_KEY"),156        aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),157    )158    with open("env.yaml", "r", encoding="utf-8") as f:159        params = yaml.safe_load(f)["DynamoDB"]160    with open("item.yaml", "r", encoding="utf-8") as f:161        items = yaml.safe_load(f)["TestItem"]162    get_table_schema(dynamodb, "TestTable")163    list_tables(dynamodb)164    delete_table(dynamodb, "TestTable")165    create_table(dynamodb, params)166    put_item(dynamodb, "TestTable", items)167    scan_table(dynamodb, "TestTable")168    truncate_table(dynamodb, "TestTable")169    scan_table(dynamodb, "TestTable")170    update_option = {171        "Key": {"user_id": "test_user_2", "count": 100},172        "UpdateExpression": "set #attr1 = :_status",173        "ExpressionAttributeNames": {"#attr1": "status"},174        "ExpressionAttributeValues": {":_status": 404},175        "ReturnValues": "UPDATED_NEW",176    }177    update_item(178        dynamodb_resource=dynamodb, table_name="TestTable", option=update_option179    )180    query_options = {181        # "Select": "COUNT",182        "KeyConditionExpression": Key("user_id").eq("test_user_3")183        & Key("count").eq(40),184        # "FilterExpression": Attr("status").eq(1),185    }186    query_table(187        dynamodb_resource=dynamodb, table_name="TestTable", options=query_options188    )189    src_dynamodb_client = boto3.client(190        "dynamodb",191        endpoint_url="http://localhost:8100",192        region_name=os.environ.get("REGION_NAME"),193        aws_access_key_id=os.environ.get("AWS_ACCESS_KEY"),194        aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),195    )196    dst_dynamodb_client = boto3.client(197        "dynamodb",198        endpoint_url="http://localhost:8100",199        region_name=os.environ.get("REGION_NAME"),200        aws_access_key_id=os.environ.get("AWS_ACCESS_KEY"),201        aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),202    )...util.py
Source:util.py  
...38        aws_access_key_id=aws_access_key_id,39        aws_secret_access_key=aws_secret_access_key,40        aws_session_token=aws_session_token41    )42def setup_dynamodb_resource(aws_access_key_id, aws_secret_access_key, aws_session_token, region_name='us-east-1'):43    global dynamodb_resource44    dynamodb_resource = boto3.resource(45        'dynamodb',46        region_name=region_name,47        aws_access_key_id=aws_access_key_id,48        aws_secret_access_key=aws_secret_access_key,49        aws_session_token=aws_session_token50    )51def delete_object(bucket_name, object_name):52    global s3_client53    if not s3_client:54        return None55    try:56        s3_client.delete_object(...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
