Best Python code snippet using localstack_python
aws_events_info.py
Source:aws_events_info.py  
...155                return paginator.paginate(156                    NamePrefix=module.params['name_prefix'],157                ), True158            else:159                return client.list_event_sources(160                    NamePrefix=module.params['name_prefix'],161                ), False162        elif module.params['list_partner_event_source_accounts']:163            if client.can_paginate('list_partner_event_source_accounts'):164                paginator = client.get_paginator('list_partner_event_source_accounts')165                return paginator.paginate(166                    EventSourceName=module.params['event_source_name'],167                ), True168            else:169                return client.list_partner_event_source_accounts(170                    EventSourceName=module.params['event_source_name'],171                ), False172        elif module.params['list_partner_event_sources']:173            if client.can_paginate('list_partner_event_sources'):...event_stream_trigger.py
Source:event_stream_trigger.py  
...83            print("Creating event source mapping")84            return True85    except ClientError as error:86        print(error.response)87def list_event_sources(source_arn, lambda_arn):88    '''89    Gets the UUID of the current event source so we can update it90    91    args:92        source_arn: the arn of the kinesis or dynamodb stream, retrieved from get arn helper functions93        lambda_name: name of the lambda, retried from config file94    '''95    try:96        current_sources = lambdaClient.list_event_source_mappings(FunctionName=lambda_arn, EventSourceArn=source_arn)97        if current_sources['ResponseMetadata']['HTTPStatusCode'] == 200:98            if len(current_sources['EventSourceMappings']) > 0:99                uuid = current_sources['EventSourceMappings'][0]['UUID']100                batch_size = current_sources['EventSourceMappings'][0]['BatchSize']101                state = current_sources['EventSourceMappings'][0]['State']102                source = current_sources['EventSourceMappings'][0]['EventSourceArn']103                if state == 'Enabled':104                    state = True105                else:106                    state = False107                    108                return uuid, batch_size, state, source109            else:110                return False111    except ClientError as error:112        print(error.response)113def update_event_source_trigger(uuid=None, lambda_arn=None, enabled=False, batch_size=None):114    '''115    updates current event source116    args:117        uuid: the uuid of the current event source118        lambda_name: name of the lambda we're working with119        enabled: Boolean, false by default120        batch_size: size of records being processed121    '''122    print('Attempting to update current event source')123    try:124        update_event = lambdaClient.update_event_source_mapping(125                                            UUID=uuid,126                                            FunctionName=lambda_arn,127                                            Enabled=enabled,128                                            BatchSize=batch_size129                                        )130        if update_event['ResponseMetadata']['HTTPStatusCode'] == 202:131            print("Event source updated!")132            return True133    except ClientError as error:134        print(error.response)135############## Main Entry Point ##############136def create_event_source(source_type=None, source_name=None, lambda_name=None, batch_size=None, enabled=False, starting_position=False, alias=False):137    '''138    Entrypoint for adding an event source as the lambda invocator 139    args:140        source_type: kinesis or dynamodb141        source_name: the name of the resource 142        lambda_name: name of the lambda we're going to be triggering143        enabled: boolean, False by default144        starting_position: Choice of 'TRIM_HORIZON', 'LATEST', 'AT_TIMESTAMP'145    '''146    #Get the source type and assign the value of stream accordingly 147    if source_type == 'kinesis':148        stream = get_kinesis_stream(source_name)149    elif source_type == 'dynamodb':150        stream = get_dynamo_stream(source_name)151    else:152        print("No valid source type found, please use 'kinesis', or 'dynamodb'")153        sys.exit(1)154    #Get lambda arn, this is for aliases155    arn = get_lambda_arn(lambda_name=lambda_name, alias=alias)156    #Create the mapping157    create = create_event_source_trigger(lambda_arn=arn, event_source=stream, enabled=enabled, batch_size=batch_size, starting_position=starting_position)158    if create:159        print("Event source created successfully")160        return True161def update_event_source(source_type=None, source_name=None, lambda_name=None, batch_size=None, enabled=False, alias=False):162    '''163    Main entry point for updating the event source. The first thing it does is get the arn for the desired resource164    then it checks to see if we actually need an update by validating our current state against our desired state.165    If the match it exits gracefully, else it will perform an update166    args:167        source_type: kinesis or dynamodb168        source_name: name of the resource (table name or stream name)169        lambda_name: name of the lambda function170        batch_size: integer, size of batch per lambda invocation171        enabled: Boolean, false by default172    '''173    if source_type == 'kinesis':174        stream = get_kinesis_stream(source_name)175    elif source_type == 'dynamodb':176        stream = get_dynamo_stream(source_name)177    else:178        print("No valid source type found, please use 'kinesis', or 'dynamodb'")179        sys.exit(1)180    #Get lambda arn, this is for aliases181    arn = get_lambda_arn(lambda_name=lambda_name, alias=alias)182    183    #Get all of our current values184    current_uuid, current_batch_size, current_state, current_source = list_event_sources(source_arn=stream, lambda_arn=arn)185    #Validates what we want vs what we have186    if all(item in [stream, batch_size, enabled] for item in [current_source, current_batch_size, current_state]):187        print('No update needed')188        return True189    else:190        print('Updating event source')191        update = update_event_source_trigger(uuid=current_uuid, lambda_arn=arn, enabled=enabled, batch_size=batch_size)192def check_source_mapping(source_type=None, source_name=None, lambda_name=None, alias=False):193    '''194    Gets current event source config and reports back to update-config action. This is used to see if we need to either update195    or create the event source mapping196    args:197        source_type: kinesis or dynamodb198        source_name: name of the resource (table name or stream name)199        lambda_name: name of the lambda function200    '''201    if source_type == 'kinesis':202        stream = get_kinesis_stream(source_name)203    elif source_type == 'dynamodb':204        stream = get_dynamo_stream(source_name)205    else:206        print("No valid source type found, please use 'kinesis', or 'dynamodb'")207        sys.exit(1)208    #Get arn209    arn = get_lambda_arn(lambda_name=lambda_name, alias=alias)210    211    #Get our current sources212    get_sources = list_event_sources(source_arn=stream, lambda_arn=arn)213    #If get sources returns true it means the mapping is there and we only need to update, else we'll need to create214    if get_sources:215        return True216    else:...generate_bruteforce_tests.py
Source:generate_bruteforce_tests.py  
1import re2import os3import json4OUTPUT_FMT = 'BRUTEFORCE_TESTS = %s'5OUTPUT_FILE = 'bruteforce_tests.py'6API_DEFINITIONS = 'aws-sdk-js/apis/'7OPERATION_CONTAINS = {8    'list_',9    'describe_',10    'get_',11}12BLACKLIST_OPERATIONS = {13    'get_apis',14    'get_bucket_notification',15    'get_bucket_notification_configuration',16    'list_web_ac_ls',17    'get_hls_streaming_session_url',18    'describe_scaling_plans',19    'list_certificate_authorities',20    'list_event_sources',21    'get_geo_location',22    'get_checker_ip_ranges',23    'list_geo_locations',24    'list_public_keys',25    # https://twitter.com/AndresRiancho/status/110668043444280935026    'describe_stacks',27    'describe_service_errors',28    'describe_application_versions',29    'describe_applications',30    'describe_environments',31    'describe_events',32    'list_available_solution_stacks',33    'list_platform_versions',34}35def extract_service_name(filename, api_json):36    try:37        endpoint = api_json['metadata']['endpointPrefix']38    except:39        return None40    endpoint = endpoint.replace('api.', '')41    endpoint = endpoint.replace('opsworks-cm', 'opworks')42    endpoint = endpoint.replace('acm-pca', 'acm')43    return endpoint44def is_dangerous(operation_name):45    for safe in OPERATION_CONTAINS:46        if safe in operation_name:47            return False48    return True49def extract_operations(api_json):50    operations = []51    items = api_json['operations'].items()52    for operation_name, operation_data in items:53        operation_name = to_underscore(operation_name)54        if is_dangerous(operation_name):55            continue56        if operation_name in BLACKLIST_OPERATIONS:57            continue58        inputs = operation_data.get('input', None)59        if inputs is None:60            operations.append(operation_name)61            continue62        inputs = str(inputs)63        if "required" not in inputs:64            operations.append(operation_name)65            continue66    operations = list(set(operations))67    operations.sort()68    return operations69def to_underscore(name):70    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)71    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()72def main():73    bruteforce_tests = dict()74    for filename in os.listdir(API_DEFINITIONS):75        if not filename.endswith('.min.json'):76            continue77        api_json_data = open(os.path.join(API_DEFINITIONS, filename)).read()78        api_json = json.loads(api_json_data)79        service_name = extract_service_name(filename, api_json)80        if service_name is None:81            print('%s does not define a service name' % filename)82            continue83        operations = extract_operations(api_json)84        if not operations:85            continue86        if service_name in bruteforce_tests:87            bruteforce_tests[service_name].extend(operations)88        else:89            bruteforce_tests[service_name] = operations90    output = OUTPUT_FMT % json.dumps(bruteforce_tests,91                                     indent=4,92                                     sort_keys=True)93    open(OUTPUT_FILE, 'w').write(output)94if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
