Best Python code snippet using localstack_python
lambda_event.py
Source:lambda_event.py  
1#!/usr/bin/python2# (c) 2016, Pierre Jodouin <pjodouin@virtualcomputing.solutions>3# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)4from __future__ import absolute_import, division, print_function5__metaclass__ = type6ANSIBLE_METADATA = {'metadata_version': '1.1',7                    'status': ['preview'],8                    'supported_by': 'community'}9DOCUMENTATION = '''10---11module: lambda_event12short_description: Creates, updates or deletes AWS Lambda function event mappings.13description:14    - This module allows the management of AWS Lambda function event source mappings such as DynamoDB and Kinesis stream15      events via the Ansible framework. These event source mappings are relevant only in the AWS Lambda pull model, where16      AWS Lambda invokes the function.17      It is idempotent and supports "Check" mode.  Use module M(lambda) to manage the lambda18      function itself and M(lambda_alias) to manage function aliases.19version_added: "2.2"20author: Pierre Jodouin (@pjodouin), Ryan Brown (@ryansb)21options:22  lambda_function_arn:23    description:24      - The name or ARN of the lambda function.25    required: true26    aliases: ['function_name', 'function_arn']27  state:28    description:29      - Describes the desired state.30    required: true31    default: "present"32    choices: ["present", "absent"]33  alias:34    description:35      - Name of the function alias. Mutually exclusive with C(version).36    required: true37  version:38    description:39      -  Version of the Lambda function. Mutually exclusive with C(alias).40    required: false41  event_source:42    description:43      -  Source of the event that triggers the lambda function.44    required: false45    default: stream46    choices: ['stream']47  source_params:48    description:49      -  Sub-parameters required for event source.50      -  I(== stream event source ==)51      -  C(source_arn) The Amazon Resource Name (ARN) of the Kinesis or DynamoDB stream that is the event source.52      -  C(enabled) Indicates whether AWS Lambda should begin polling the event source. Default is True.53      -  C(batch_size) The largest number of records that AWS Lambda will retrieve from your event source at the54         time of invoking your function. Default is 100.55      -  C(starting_position) The position in the stream where AWS Lambda should start reading.56         Choices are TRIM_HORIZON or LATEST.57    required: true58requirements:59    - boto360extends_documentation_fragment:61    - aws62'''63EXAMPLES = '''64---65# Example that creates a lambda event notification for a DynamoDB stream66- hosts: localhost67  gather_facts: no68  vars:69    state: present70  tasks:71  - name: DynamoDB stream event mapping72    lambda_event:73      state: "{{ state | default('present') }}"74      event_source: stream75      function_name: "{{ function_name }}"76      alias: Dev77      source_params:78        source_arn: arn:aws:dynamodb:us-east-1:123456789012:table/tableName/stream/2016-03-19T19:51:37.45779        enabled: True80        batch_size: 10081        starting_position: TRIM_HORIZON82  - name: Show source event83    debug:84      var: lambda_stream_events85'''86RETURN = '''87---88lambda_stream_events:89    description: list of dictionaries returned by the API describing stream event mappings90    returned: success91    type: list92'''93import re94import sys95try:96    import boto397    from botocore.exceptions import ClientError, ParamValidationError, MissingParametersError98    HAS_BOTO3 = True99except ImportError:100    HAS_BOTO3 = False101from ansible.module_utils.basic import AnsibleModule102from ansible.module_utils.ec2 import (HAS_BOTO3, boto3_conn, camel_dict_to_snake_dict, ec2_argument_spec,103                                      get_aws_connection_info)104# ---------------------------------------------------------------------------------------------------105#106#   Helper Functions & classes107#108# ---------------------------------------------------------------------------------------------------109class AWSConnection:110    """111    Create the connection object and client objects as required.112    """113    def __init__(self, ansible_obj, resources, use_boto3=True):114        try:115            self.region, self.endpoint, aws_connect_kwargs = get_aws_connection_info(ansible_obj, boto3=use_boto3)116            self.resource_client = dict()117            if not resources:118                resources = ['lambda']119            resources.append('iam')120            for resource in resources:121                aws_connect_kwargs.update(dict(region=self.region,122                                               endpoint=self.endpoint,123                                               conn_type='client',124                                               resource=resource125                                               ))126                self.resource_client[resource] = boto3_conn(ansible_obj, **aws_connect_kwargs)127            # if region is not provided, then get default profile/session region128            if not self.region:129                self.region = self.resource_client['lambda'].meta.region_name130        except (ClientError, ParamValidationError, MissingParametersError) as e:131            ansible_obj.fail_json(msg="Unable to connect, authorize or access resource: {0}".format(e))132        # set account ID133        try:134            self.account_id = self.resource_client['iam'].get_user()['User']['Arn'].split(':')[4]135        except (ClientError, ValueError, KeyError, IndexError):136            self.account_id = ''137    def client(self, resource='lambda'):138        return self.resource_client[resource]139def pc(key):140    """141    Changes python key into Pascale case equivalent. For example, 'this_function_name' becomes 'ThisFunctionName'.142    :param key:143    :return:144    """145    return "".join([token.capitalize() for token in key.split('_')])146def ordered_obj(obj):147    """148    Order object for comparison purposes149    :param obj:150    :return:151    """152    if isinstance(obj, dict):153        return sorted((k, ordered_obj(v)) for k, v in obj.items())154    if isinstance(obj, list):155        return sorted(ordered_obj(x) for x in obj)156    else:157        return obj158def set_api_sub_params(params):159    """160    Sets module sub-parameters to those expected by the boto3 API.161    :param params:162    :return:163    """164    api_params = dict()165    for param in params.keys():166        param_value = params.get(param, None)167        if param_value:168            api_params[pc(param)] = param_value169    return api_params170def validate_params(module, aws):171    """172    Performs basic parameter validation.173    :param module:174    :param aws:175    :return:176    """177    function_name = module.params['lambda_function_arn']178    # validate function name179    if not re.search('^[\w\-:]+$', function_name):180        module.fail_json(181            msg='Function name {0} is invalid. Names must contain only alphanumeric characters and hyphens.'.format(function_name)182        )183    if len(function_name) > 64 and not function_name.startswith('arn:aws:lambda:'):184        module.fail_json(msg='Function name "{0}" exceeds 64 character limit'.format(function_name))185    elif len(function_name) > 140 and function_name.startswith('arn:aws:lambda:'):186        module.fail_json(msg='ARN "{0}" exceeds 140 character limit'.format(function_name))187    # check if 'function_name' needs to be expanded in full ARN format188    if not module.params['lambda_function_arn'].startswith('arn:aws:lambda:'):189        function_name = module.params['lambda_function_arn']190        module.params['lambda_function_arn'] = 'arn:aws:lambda:{0}:{1}:function:{2}'.format(aws.region, aws.account_id, function_name)191    qualifier = get_qualifier(module)192    if qualifier:193        function_arn = module.params['lambda_function_arn']194        module.params['lambda_function_arn'] = '{0}:{1}'.format(function_arn, qualifier)195    return196def get_qualifier(module):197    """198    Returns the function qualifier as a version or alias or None.199    :param module:200    :return:201    """202    qualifier = None203    if module.params['version'] > 0:204        qualifier = str(module.params['version'])205    elif module.params['alias']:206        qualifier = str(module.params['alias'])207    return qualifier208# ---------------------------------------------------------------------------------------------------209#210#   Lambda Event Handlers211#212#   This section defines a lambda_event_X function where X is an AWS service capable of initiating213#   the execution of a Lambda function (pull only).214#215# ---------------------------------------------------------------------------------------------------216def lambda_event_stream(module, aws):217    """218    Adds, updates or deletes lambda stream (DynamoDb, Kinesis) event notifications.219    :param module:220    :param aws:221    :return:222    """223    client = aws.client('lambda')224    facts = dict()225    changed = False226    current_state = 'absent'227    state = module.params['state']228    api_params = dict(FunctionName=module.params['lambda_function_arn'])229    # check if required sub-parameters are present and valid230    source_params = module.params['source_params']231    source_arn = source_params.get('source_arn')232    if source_arn:233        api_params.update(EventSourceArn=source_arn)234    else:235        module.fail_json(msg="Source parameter 'source_arn' is required for stream event notification.")236    # check if optional sub-parameters are valid, if present237    batch_size = source_params.get('batch_size')238    if batch_size:239        try:240            source_params['batch_size'] = int(batch_size)241        except ValueError:242            module.fail_json(msg="Source parameter 'batch_size' must be an integer, found: {0}".format(source_params['batch_size']))243    # optional boolean value needs special treatment as not present does not imply False244    source_param_enabled = module.boolean(source_params.get('enabled', 'True'))245    # check if event mapping exist246    try:247        facts = client.list_event_source_mappings(**api_params)['EventSourceMappings']248        if facts:249            current_state = 'present'250    except ClientError as e:251        module.fail_json(msg='Error retrieving stream event notification configuration: {0}'.format(e))252    if state == 'present':253        if current_state == 'absent':254            starting_position = source_params.get('starting_position')255            if starting_position:256                api_params.update(StartingPosition=starting_position)257            else:258                module.fail_json(msg="Source parameter 'starting_position' is required for stream event notification.")259            if source_arn:260                api_params.update(Enabled=source_param_enabled)261            if source_params.get('batch_size'):262                api_params.update(BatchSize=source_params.get('batch_size'))263            try:264                if not module.check_mode:265                    facts = client.create_event_source_mapping(**api_params)266                changed = True267            except (ClientError, ParamValidationError, MissingParametersError) as e:268                module.fail_json(msg='Error creating stream source event mapping: {0}'.format(e))269        else:270            # current_state is 'present'271            api_params = dict(FunctionName=module.params['lambda_function_arn'])272            current_mapping = facts[0]273            api_params.update(UUID=current_mapping['UUID'])274            mapping_changed = False275            # check if anything changed276            if source_params.get('batch_size') and source_params['batch_size'] != current_mapping['BatchSize']:277                api_params.update(BatchSize=source_params['batch_size'])278                mapping_changed = True279            if source_param_enabled is not None:280                if source_param_enabled:281                    if current_mapping['State'] not in ('Enabled', 'Enabling'):282                        api_params.update(Enabled=True)283                        mapping_changed = True284                else:285                    if current_mapping['State'] not in ('Disabled', 'Disabling'):286                        api_params.update(Enabled=False)287                        mapping_changed = True288            if mapping_changed:289                try:290                    if not module.check_mode:291                        facts = client.update_event_source_mapping(**api_params)292                    changed = True293                except (ClientError, ParamValidationError, MissingParametersError) as e:294                    module.fail_json(msg='Error updating stream source event mapping: {0}'.format(e))295    else:296        if current_state == 'present':297            # remove the stream event mapping298            api_params = dict(UUID=facts[0]['UUID'])299            try:300                if not module.check_mode:301                    facts = client.delete_event_source_mapping(**api_params)302                changed = True303            except (ClientError, ParamValidationError, MissingParametersError) as e:304                module.fail_json(msg='Error removing stream source event mapping: {0}'.format(e))305    return camel_dict_to_snake_dict(dict(changed=changed, events=facts))306def main():307    """Produce a list of function suffixes which handle lambda events."""308    this_module = sys.modules[__name__]309    source_choices = ["stream"]310    argument_spec = ec2_argument_spec()311    argument_spec.update(312        dict(313            state=dict(required=False, default='present', choices=['present', 'absent']),314            lambda_function_arn=dict(required=True, default=None, aliases=['function_name', 'function_arn']),315            event_source=dict(required=False, default="stream", choices=source_choices),316            source_params=dict(type='dict', required=True, default=None),317            alias=dict(required=False, default=None),318            version=dict(type='int', required=False, default=0),319        )320    )321    module = AnsibleModule(322        argument_spec=argument_spec,323        supports_check_mode=True,324        mutually_exclusive=[['alias', 'version']],325        required_together=[]326    )327    # validate dependencies328    if not HAS_BOTO3:329        module.fail_json(msg='boto3 is required for this module.')330    aws = AWSConnection(module, ['lambda'])331    validate_params(module, aws)332    this_module_function = getattr(this_module, 'lambda_event_{0}'.format(module.params['event_source'].lower()))333    results = this_module_function(module, aws)334    module.exit_json(**results)335if __name__ == '__main__':...lambda_last_used.py
Source:lambda_last_used.py  
1"""2Identify when a lambda function was last modified or invoked. This script3helps identify lambda functions which can be removed from the AWS account4because nobody is using them.5This script is different from the others because it has a lot of external6dependencies and manual steps that need to be done before running it.7Requirements8    1. Enable CloudTrail logging to an S3 bucket9    2. Enable Lambda detailed logging in CloudTrail to get the Invoke calls10    3. Run cloudtrail-partitioner [0], no need to install it, just run the tool11       to get 90 day visibility.12    4. Use Athena to query the events and download the result as CSV. Use the13       this Athena query [1] to get all the data from the previously created14       partitions. Make sure you adjust the dates.15    5. Run this tool16[0] https://github.com/duo-labs/cloudtrail-partitioner/17[1] https://gist.github.com/andresriancho/512bfbae1ad8b175a36d6fdc32b8ccef18"""19import os20import sys21import csv22import json23import argparse24import boto325from dateutil.parser import parse26from datetime import datetime, timezone27from utils.regions import get_all_regions28from lambda_dump import get_lambda_functions_for_region29from utils.boto_error_handling import yield_handling_errors30DEFAULT_DATE = datetime(1970, 1, 1, tzinfo=timezone.utc)31def parse_arguments():32    parser = argparse.ArgumentParser()33    parser.add_argument(34        '--input',35        help='Athena-generated CSV file',36        required=True37    )38    parser.add_argument(39        '--profile',40        help='AWS profile from ~/.aws/credentials',41        required=True42    )43    args = parser.parse_args()44    try:45        session = boto3.Session(profile_name=args.profile)46    except Exception as e:47        print('%s' % e)48        sys.exit(1)49    csv_file = args.input50    if not os.path.exists(csv_file):51        print('%s is not a file' % csv_file)52        sys.exit(1)53    return csv_file, session54class LambdaData(object):55    def __init__(self, event_time, request_parameters, aws_region, event_source):56        self.event_time = event_time57        self.request_parameters = request_parameters58        self.aws_region = aws_region59        self.event_source = event_source60def parse_csv(csv_file):61    lambda_last_used_data = dict()62    with open(csv_file, newline='') as csv_file:63        reader = csv.reader(csv_file)64        headers = next(reader, None)65        for row in reader:66            (event_time, event_name, request_parameters, aws_region, event_source, resources) = row67            request_parameters = json.loads(request_parameters)68            event_time = parse(event_time)69            lambda_function_arn = request_parameters['functionName']70            if lambda_function_arn in lambda_last_used_data:71                # Might need to update the last used time72                if event_time > lambda_last_used_data[lambda_function_arn].event_time:73                    lambda_last_used_data[lambda_function_arn] = LambdaData(event_time,74                                                                            request_parameters,75                                                                            aws_region,76                                                                            event_source)77            else:78                # New lambda function79                lambda_last_used_data[lambda_function_arn] = LambdaData(event_time,80                                                                        request_parameters,81                                                                        aws_region,82                                                                        event_source)83    return lambda_last_used_data84def sort_key(item):85    return item[1]86def print_output(lambda_last_used_data):87    data = []88    for lambda_function_arn, lambda_data in lambda_last_used_data.items():89        item = (lambda_function_arn, lambda_data.event_time,)90        data.append(item)91    data.sort(key=sort_key, reverse=True)92    for lambda_function_arn, event_time in data:93        if event_time is DEFAULT_DATE:94            msg = '%s NOT used during the tracking period'95            args = (lambda_function_arn,)96            print(msg % args)97        else:98            days_ago = datetime.now() - event_time.replace(tzinfo=None)99            days_ago = days_ago.days100            msg = '%s was last used %s days ago'101            args = (lambda_function_arn, days_ago)102            print(msg % args)103def dump_lambda_functions(session):104    all_lambda_functions = []105    for region in get_all_regions(session):106        client = session.client('lambda', region_name=region)107        iterator = yield_handling_errors(get_lambda_functions_for_region, client)108        for lambda_function in iterator:109            function_name = lambda_function['FunctionArn']110            all_lambda_functions.append(function_name)111    return all_lambda_functions112def merge_all_functions(lambda_last_used_data, all_lambda_functions):113    for lambda_function_arn in all_lambda_functions:114        if lambda_function_arn not in lambda_last_used_data:115            lambda_last_used_data[lambda_function_arn] = LambdaData(DEFAULT_DATE,116                                                                    None,117                                                                    None,118                                                                    None)119    return lambda_last_used_data120def main():121    csv_file, session = parse_arguments()122    print('Parsing CSV file...')123    lambda_last_used_data = parse_csv(csv_file)124    print('Getting all existing AWS Lambda functions...')125    all_lambda_functions = dump_lambda_functions(session)126    lambda_last_used_data = merge_all_functions(lambda_last_used_data, all_lambda_functions)127    print('')128    print('Result:')129    print('')130    print_output(lambda_last_used_data)131if __name__ == '__main__':...test_tiingo_scheduler.py
Source:test_tiingo_scheduler.py  
1import json2from datetime import datetime3from typing import List4import boto35import pytest6from .aws_helpers import get_matching_s3_keys7from .share import lambda_function_check_setup, lambda_function_setup_contain_env_var8def test_tiingo_scheduler_lambda_python_setup(version, environment, terraform_output):9    lambda_function_name = terraform_output["tiingo_scheduler_name"]["value"]10    region = terraform_output["region"]["value"]11    expected_timeout = 30012    lambda_function_check_setup(lambda_function_name, region, expected_timeout)13@pytest.mark.parametrize(14    "env_var",15    [16        ("AWS_S3_BUCKET"),17        ("TIINGO_FETCHER_FUNCTION_NAME"),18        ("LAMBDA_INVOCATION_TYPE"),19        ("TIINGO_TICKERS_FILE"),20    ],21)22def test_tiingo_scheduler_env_var_exist(23    version, environment, terraform_output, env_var24):25    lambda_function_name = terraform_output["tiingo_scheduler_name"]["value"]26    region = terraform_output["region"]["value"]27    lambda_function_setup_contain_env_var(lambda_function_name, region, env_var)28@pytest.mark.parametrize(29    "event_rule, json_param",30    [31        (32            "Monday_to_Friday_8pm_HKT",33            '{"filters": [{"exchange": "SHE"}, {"exchange": "SHG"}]}',34        )35    ],36)37def test_target_has_right_param_for_rule(38    terraform_output, environment, event_rule: str, json_param: str39):40    lambda_function_arn = terraform_output["tiingo_scheduler_arn"]["value"]41    region = terraform_output["region"]["value"]42    events_client = boto3.client("events", region_name=region)43    targets = events_client.list_rule_names_by_target(TargetArn=lambda_function_arn)44    assert len(targets["RuleNames"]) == 545    #46    # assert targets["Targets"][0]["Arn"] == lambda_function_arn47    # assert targets["Targets"][0]["Input"] == json_param48@pytest.mark.parametrize(49    "json_filter,expect_items",50    [51        (52            '[{"exchange": "NASDAQ", "asset_type": "Mutual Fund"}]',53            [54                "BVNSC",55                "CGVIC",56                "CIVEC",57                "CRUSC",58                "EMFN",59                "EMMT",60                "EVGBC",61                "EVLMC",62                "EVSTC",63                "EWJE",64                "FEFN",65                "FOANC",66                "IVENC",67                "IVFGC",68                "MOGLC",69                "NUCL",70                "PETZC",71                "RPIBC",72                "SRRIX",73                "XBGIOX",74                "XJEMDX",75            ],76        )77    ],78)79def test_invoke_tiingo_scheduler(80    terraform_output, json_filter: str, expect_items: List[str]81):82    region = terraform_output["region"]["value"]83    lambda_function_arn = terraform_output["tiingo_scheduler_name"]["value"]84    s3_bucket_name = terraform_output["s3_bucket_name"]["value"]85    base_path = "market_data_tiingo_scheduler"86    event = {"base_path": base_path, "filters": json.loads(json_filter)}87    payload = json.dumps(event)88    client_lambda = boto3.client("lambda", region_name=region)89    invoke_result = client_lambda.invoke(90        FunctionName=lambda_function_arn,91        InvocationType="RequestResponse",92        Payload=payload,93    )94    assert 200 == invoke_result["StatusCode"]95    assert "Handled" == invoke_result.get("FunctionError", "Handled")96    start_time = datetime.now()97    max_wait_time_s = 30098    result_file_keys = set(99        [f"{base_path}/{ticker}/1d/data.csv" for ticker in expect_items]100    )101    while True:102        file_keys = set(103            get_matching_s3_keys(s3_bucket_name, f"{base_path}/", "data.csv")104        )105        if len(file_keys) == len(result_file_keys):106            break107        loop_time = datetime.now()108        if (loop_time - start_time).seconds > max_wait_time_s:109            assert False, f"Max wait time greater than {max_wait_time_s}s"...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
