Best Python code snippet using localstack_python
import_lambda.py
Source:import_lambda.py  
1#External libs2import ast3import boto34import json5import sys6import os7from botocore.exceptions import ClientError8#Establish our boto resources9client = boto3.client('lambda')10session = boto3.session.Session()11region = session.region_name12ec2Client = boto3.client('ec2')13def import_config(lambda_name, alias=False):14    '''15    Uses the lambda name to grab existing lambda configuration16    and import it into a config file17    args:18        lambda_name: the name of the lambda you want to import19        alias: the alias, defaults to $LATEST if not present20    '''21    print('Attempting to import configuration')22    #Create an empty config dict, start with the bare minimum23    config_dict = { 24                    "initializers": {25                        "name": "",26                        "description": "",27                        "region": region,28                        "handler": "",29                        "role": ""30                    },31                    "provisioners": {32                        "runtime": "",33                        "timeout": 0,34                        "mem_size": 035                    }36                }37    #If the user didn't pass an alias we want to use $LATEST38    if not alias:39        alias_used = '$LATEST'40    else:41        alias_used = alias42    try:43        lambda_config = client.get_function(44                            FunctionName=lambda_name,45                            Qualifier=alias_used46                        )47        #Set a variable to make things readable later48        config = lambda_config['Configuration']49    except ClientError as error:50        print(error.response)51        sys.exit(1)52    else:53        #Check to make sure there is a config there54        #print(json.dumps(config, indent=4))55        if 'FunctionName' in config:56            #Grab our initializers57            config_dict['initializers']['name'] = config['FunctionName']58            config_dict['initializers']['handler'] = handler = config['Handler']59            config_dict['initializers']['description'] = config['Description']60            config_dict['initializers']['role'] = config['Role'].split('/')[-1]61            62            #Grab our provisioners63            config_dict['provisioners']['timeout'] = config['Timeout']64            config_dict['provisioners']['mem_size'] = config['MemorySize']65            config_dict['provisioners']['runtime'] = config['Runtime']66            #VPC Work67            sg_ids = config['VpcConfig']['SecurityGroupIds']68            if len(sg_ids) != 0:69                vpc_id = config['VpcConfig']['VpcId']70                config_dict['vpc_setting'] = {"vpc_name": vpc_id, "security_group_ids": sg_ids}     71            else:72                pass73            74            #Tracing config75            trace_config = config['TracingConfig']76            if trace_config['Mode'] != "PassThrough":77                config_dict['initializers']['tracing_mode'] = trace_config['Mode'].lower()78            else:79                pass80            if 'DeadLetterConfig' in config:81                #Split DLQ config so we can grab things easily82                dead_letter_split = config['DeadLetterConfig']['TargetArn'].split(':')83                config_dict['dead_letter_config'] = {"type": dead_letter_split[2], "target_name": dead_letter_split[-1]}84            else:85                pass86            #Variables87            if 'Environment' in config:88                config_dict['variables'] = config['Environment']['Variables']89            else:90                pass91            #Write the alias if needed92            if alias_used != "$LATEST":93                config_dict['initializers']['alias'] = alias94            else:95                pass96            #Set a variable for the ARN97            true_arn = config['FunctionArn'].split(':')[:7]98            function_arn = ":".join(true_arn)99        else:100            print("No Lambda found! Please check your config")101    finally:102        return config_dict, function_arn103def import_triggers(lambda_name, alias=False):104    '''105    Uses the lambda name to grab triggers from existing lambda config106    and import it into a config file107    args:108        lambda_name: 109        alias: 110    '''111    print("Attempting to retrieve lambda triggers")112    try:113        if not alias:114            get_lambda_policy = client.get_policy(115                        FunctionName=lambda_name116                    )117        else:118            get_lambda_policy = client.get_policy(119                            FunctionName=lambda_name,120                            Qualifier=alias121                        ) 122    except ClientError as error:123        print("No policy found")124        principal = ''125        resource = ''126    else:127        policy = ast.literal_eval(get_lambda_policy['Policy'])128        statement_dict = policy['Statement'][0]129        principal = statement_dict['Principal']['Service'].split('.')[0]130        resource = statement_dict['Condition']['ArnLike']['AWS:SourceArn'].split(":")[-1]131    finally:132        return principal, resource133def get_sg_name(sg_id):134    '''135    Grabs the NAME of the security groups, we want them because they are136    friendlier to read than the sg code137    args:138        sg_id: the id of the security group, returned from import_config139    '''140    print('Attempting to retrieve security group names')141    try:142        sg_info = ec2Client.describe_security_groups(143                    GroupIds=sg_id144                )145        group_info = sg_info['SecurityGroups'][0]146    except ClientError as error:147        print(error.response)148        sys.exit(1)149    finally:150        print("Retrieved name for groups %s" % group_info['GroupName'])151        return group_info['GroupName']152def get_vpc_name(vpc_id):153    '''154    Grabs the NAME of the VPC155    args:156        vpc_id: the id the VPC, returned from import_config157    '''158    print('Attempting to retrieve VPC name')159    try:160        vpc_info = ec2Client.describe_vpcs(161                        VpcIds=[vpc_id]162                    )163        vpc_info = vpc_info['Vpcs'][0]164    except ClientError as error:165        print(error.response)166        sys.exit(1)167    else:168        tags = vpc_info['Tags'][0]169        170        if tags['Key'] == 'Name':171            name = tags['Value']172        else:173            print("No VPC found, make sure your VPC is tagged 'Key': 'Name', 'Value': 'Your-VPC'")174            sys.exit(1)175    finally:176        print('Retrieved VPC name %s' % name)177        return name178def get_tags(lambda_arn):179    '''180    Grabs a tags dict from the current lambda181    args:182        lambda_arn: the arn returned from the import config function183    '''184    print('Attempting to retrieve tags')185    try:186        tags = client.list_tags(Resource=lambda_arn)187        tags = tags['Tags']188    except ClientError as error:189        print(error.response)190        sys.exit(1)191    finally:192        return tags193########### Entrypoint ###########194def import_lambda(lambda_name, alias):195    '''196    The main entry point of the module197    args:198        lambda_name: the name of the lambda199        alias: alias of the lambda200    '''201    config_dict, lambda_arn = import_config(lambda_name=lambda_name, alias=alias)202    tag_dict = get_tags(lambda_arn)203    if len(tag_dict) != 0:204        config_dict['tags'] = tag_dict205    if 'vpc_setting' in config_dict:206        config_dict['vpc_setting']['vpc_name'] = get_vpc_name(config_dict['vpc_setting']['vpc_name'])207        config_dict['vpc_setting']['security_group_ids'] = get_sg_name(config_dict['vpc_setting']['security_group_ids'])208    trigger_method, trigger_source = import_triggers(lambda_name=lambda_name, alias=alias)209    210    if len(trigger_method) != 0:211        config_dict['trigger'] = {"method": trigger_method, "source": trigger_source}212    ...monitoring_lambda.py
Source:monitoring_lambda.py  
...10    for statement in policy_statements_id_list:11        print(lambda_wrapper.remove_lambda_permission(12            logs_client, lambda_name, statement))13def get_lambda_policy_statements(logs_client, lambda_name):14    res = json.loads(lambda_wrapper.get_lambda_policy(15        logs_client, lambda_name)['Policy'])16    statements = []17    for r in res["Statement"]:18        statements.append(r["Sid"])19    return statements20def exist_subcription_filter(logs_client, log_group_name):21    subcription_filters = logs_wrapper.list_subcription_filters(logs_client)22    for sflist in subcription_filters:23        for sf in sflist:24            if sf['logGroupName'] == log_group_name:25                return True26    return False27def add_log_group_for_monitoring(lambda_client,28                                 logs_client,29                                 destination_lambda_name,30                                 log_group_name, pattern, region, account):31    if not logs_wrapper.exist_log_group(logs_client, log_group_name):32        raise Exception(f'The {log_group_name} does not exist.')33    response_lambda = lambda_wrapper.add_lambda_permission(34        lambda_client, destination_lambda_name, log_group_name, account, region)35    response_logs = logs_wrapper.create_subcription_filter(36        logs_client, destination_lambda_name, log_group_name, pattern, region, account)37    return {'response': [response_lambda, response_logs]}38def remove_log_group_of_monitoring(lambda_client, logs_client, destination_lambda_name, log_group_name):39    res = json.loads(lambda_wrapper.get_lambda_policy(40        lambda_client, destination_lambda_name)['Policy'])41    statements = res['Statement']42    for s in statements:43        ln = s['Sid'].split('_')[1]44        if ln == log_group_name.split('/')[-1]:45            lambda_wrapper.remove_lambda_permission(46                lambda_client, destination_lambda_name, s['Sid'])47    return logs_wrapper.remove_subcription_filter(logs_client, log_group_name)48def add_lambda_for_monitoring(lambda_client,49                              logs_client,50                              destination_lambda_name,51                              lambda_name, pattern, region, account):52    log_group_name = "/aws/lambda/{}".format(lambda_name)53    if not logs_wrapper.exist_log_group(logs_client, log_group_name):54        logs_wrapper.create_lambda_log_group(logs_client, lambda_name)55    response_lambda = lambda_wrapper.add_lambda_permission(56        lambda_client, destination_lambda_name, log_group_name, account, region)57    response_logs = logs_wrapper.create_subcription_filter(58        logs_client, destination_lambda_name, log_group_name, pattern, region, account)59    return {'response': [response_lambda, response_logs]}60def remove_lambda_of_monitoring(lambda_client, logs_client, destination_lambda_name, lambda_name):61    res = json.loads(lambda_wrapper.get_lambda_policy(62        lambda_client, destination_lambda_name)['Policy'])63    statements = res['Statement']64    for s in statements:65        ln = s['Sid'].split('_')[1]66        if ln == lambda_name:67            lambda_wrapper.remove_lambda_permission(68                lambda_client, destination_lambda_name, s['Sid'])69    return logs_wrapper.remove_subcription_filter(logs_client, "/aws/lambda/{}".format(lambda_name))70if __name__ == "__main__":71    print(lambda_wrapper.invoke_lambda_function(72        lambda_client, "hello-world-lambda-py", {}))73    # print(json.dumps(logs_wrapper.list_logs_events(...utils.py
Source:utils.py  
...15    region_name=os.environ.get('AWS_REGION') or 'us-east-1'16)17cloud_watch = session.client('logs')18lambda_func = session.client('lambda')19def get_lambda_policy():20    try:21        lambda_policy = lambda_func.get_policy(FunctionName=LOG_SHIPPER_FUNCTION).get("Policy")22        return json.loads(lambda_policy)23    except ClientError as error:24        raise error25def remove_lambda_permissions(sid):26    try:27        remove_permission = lambda_func.remove_permission(FunctionName=LOG_SHIPPER_FUNCTION, StatementId=sid)28        return remove_permission29    except ClientError as error:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
