Best Python code snippet using autotest_python
app.py
Source:app.py  
1from flask import Flask, request2from flask_cors import CORS3import os4from command.initialize import Initialize5from command.collect import Collect6from command.prepare import Prepare7from dotenv import load_dotenv, find_dotenv8app = Flask(__name__)9CORS(app)10load_dotenv(find_dotenv())11AWS_ACCESS_KEY_ID = os.environ.get("AWS_ACCESS_KEY_ID")12AWS_SECRET_ACCESS_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY")13@app.route('/api')14def root_route():15    return 'Hello World'16@app.route('/api/newPost', methods=['POST'])17def initialize():18    # clear all the data inside the VM19    os.system('./tools/shell/jobClearJSON.sh')20    # get the input21    input = request.json22    print(AWS_ACCESS_KEY_ID)23    print(AWS_SECRET_ACCESS_KEY)24    # make class instance25    initialize_command = Initialize(input['region'], input['user_id'],26                                    input['account_name'], input['aws_access_key_id'], input['aws_secret_access_key'],27                                    input['aws_session_token'])28    # execute class function29    initialize_command.validate_input()30    initialize_command.check_identity()31    initialize_command.send_request_to_s3(32        AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)33    requestID = initialize_command.make_request(34        AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)35    print('The requestID is:', requestID)36    # print parameter inside class37    initialize_command.print_class_parameter()38    # get account id to initialize collect class39    account_id = initialize_command.get_account_id()40    user_aws_access_key_id = initialize_command.get_aws_access_key_id()41    user_aws_secret_access_key = initialize_command.get_aws_secret_access_key()42    # get the region43    regions = initialize_command.get_region()44    for region in regions:45        # initialize class46        collect_command = Collect(region, account_id, user_aws_access_key_id,47                                  user_aws_secret_access_key)48        # call command functions49        collect_command.get_lambda_list()50        collect_command.get_lambda_policy()51        collect_command.list_sqs_queue()52        collect_command.get_sqs_queue()53        collect_command.list_sns_topic()54        collect_command.get_sns_topic_attribute()55        collect_command.list_dynamodb_table()56        collect_command.scan_dynamodb_table()57        collect_command.describe_dynamodb_table()58        collect_command.get_cloudtrail_start_sfn()59        collect_command.get_cloudtrail_translate_text()60        collect_command.get_cloudtrail_start_transcription_job()61        collect_command.get_apigw_rest_apis()62        collect_command.get_apigw_resources()63        collect_command.get_cloudtrail_waf_createWebACL()64        collect_command.get_cloudwatch_describe_log_groups()65        collect_command.get_cloudwatch_describe_log_stream()66        collect_command.get_cloudwatch_describe_log_event()67        collect_command.list_s3_bucket()68        collect_command.get_s3_bucket_policy()69        collect_command.describe_rds_instance()70        collect_command.describe_elbv2_load_balancer()71        collect_command.describe_elbv2_target_group()72        collect_command.describe_elbv2_target_health()73        collect_command.list_elb_tags()74        collect_command.describe_ec2_instances()75        collect_command.describe_ec2_vpc_endpoint()76        collect_command.describe_ec2_subnets()77        collect_command.describe_ec2_security_groups()78        collect_command.describe_ec2_route_tables()79        collect_command.describe_ec2_network_interfaces()80        collect_command.describe_ec2_network_acls()81        collect_command.describe_ec2_describe_availability_zones()82        collect_command.describe_ec2_internet_gateway()83        collect_command.list_sfn()84        collect_command.describe_sfn()85        collect_command.list_waf_web_acl()86        collect_command.list_waf_tags()87        collect_command.list_waf_resource()88        collect_command.get_apigw_integration()89        collect_command.get_cloudwatch_cognito_event()90        collect_command.list_cognito_identity_pool()91        collect_command.get_s3_bucket_policy_status()92        collect_command.describe_cognito_identity_pools()93        collect_command.list_sns_tag()94        collect_command.list_sfn_tag()95        collect_command.list_ddb_tag()96        collect_command.list_s3_tag()97        collect_command.list_lambda_tag()98        collect_command.list_resource_group()99        collect_command.list_resource_group_resources()100        collect_command.list_resource_group_tag()101        collect_command.list_vpcs()102    # initialize a class103    cytoscape_node_data=[]104    cytoscape_edge_data=[]105    for i in range(len(regions)):106        prepare_command = Prepare(regions[i], account_id,107        cytoscape_node_data,cytoscape_edge_data)108        # prepare node logic109        prepare_command.prepare_lambda_node()110        prepare_command.prepare_s3_node()111        prepare_command.prepare_dynamodb_node()112        prepare_command.prepare_transcribe_node()113        prepare_command.prepare_translate_node()114        prepare_command.prepare_sns_node()115        prepare_command.prepare_sfn_node()116        prepare_command.prepare_apigw_node()117        prepare_command.prepare_cognito_node()118        prepare_command.prepare_rds_node()119        prepare_command.prepare_ec2_node()120        prepare_command.prepare_rg_node()121        prepare_command.prepare_waf_node()122        prepare_command.prepare_elbv2_node()123        prepare_command.prepare_vpc_node()124        prepare_command.prepare_subnet_node()125        # prepare edge logic126        prepare_command.find_sfn_connection()127        prepare_command.find_edge_lambda_to_sns()128        prepare_command.find_edge_lambda_to_ddb()129        prepare_command.find_edge_lambda_to_transcribe()130        prepare_command.find_edge_lambda_to_translate()131        prepare_command.find_edge_transcribe_to_s3()132        prepare_command.find_edge_s3_to_cognito()133        prepare_command.find_edge_lambda_to_sfn()134        prepare_command.find_edge_apigw_to_lambda()135        prepare_command.find_edge_sns_to_lambda()136        prepare_command.rg_find_connection()137        prepare_command.find_edge_waf_to_elb()138        prepare_command.find_edge_elb_to_ec2()139        # EXPORT INTO JSON FILE140        if(i == len(regions)-1):141            prepare_command.exportToJSON()142            prepare_command.export_JSON_to_S3(143                AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)144            prepare_command.write_to_dynamoDB(145                str(requestID), AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)146    return {147        'status': 'Success',148        'code': 200,149        'id': requestID,150        'message': 'Succeed creating and processing request'151    }152@app.route('/api/update', methods=['POST'])153def update():154    # clear all data inside VM155    os.system('./tools/shell/jobClearJSON.sh')156    # get the input from http157    input = request.json158    print(input)159    # initialize the Initialize Class160    initialize_command = Initialize('', '', '', '', '', '')161    # execute the sequence for update162    initialize_command.process_requestID(163        input['requestID'], AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)164    initialize_command.update_ddb_status(165        input['requestID'], AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)166    initialize_command.get_credentials(167        AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)168    initialize_command.check_identity()169    # print the parameter inside Initialize Class170    initialize_command.print_class_parameter()171    # get account_id172    account_id = initialize_command.get_account_id()173    user_aws_access_key_id = initialize_command.get_aws_access_key_id()174    user_aws_secret_access_key = initialize_command.get_aws_secret_access_key()175    # get the region176    regions = initialize_command.get_region()177    for region in regions:178        # initialize class179        collect_command = Collect(region, account_id, user_aws_access_key_id,180                                  user_aws_secret_access_key)181        # call command functions182        collect_command.get_lambda_list()183        collect_command.get_lambda_policy()184        collect_command.list_sqs_queue()185        collect_command.get_sqs_queue()186        collect_command.list_sns_topic()187        collect_command.get_sns_topic_attribute()188        collect_command.list_dynamodb_table()189        collect_command.scan_dynamodb_table()190        collect_command.describe_dynamodb_table()191        collect_command.get_cloudtrail_start_sfn()192        collect_command.get_cloudtrail_translate_text()193        collect_command.get_cloudtrail_start_transcription_job()194        collect_command.get_apigw_rest_apis()195        collect_command.get_apigw_resources()196        collect_command.get_cloudtrail_waf_createWebACL()197        collect_command.get_cloudwatch_describe_log_groups()198        collect_command.get_cloudwatch_describe_log_stream()199        collect_command.get_cloudwatch_describe_log_event()200        collect_command.list_s3_bucket()201        collect_command.get_s3_bucket_policy()202        collect_command.describe_rds_instance()203        collect_command.describe_elbv2_load_balancer()204        collect_command.describe_elbv2_target_group()205        collect_command.describe_elbv2_target_health()206        collect_command.list_elb_tags()207        collect_command.describe_ec2_instances()208        collect_command.describe_ec2_vpc_endpoint()209        collect_command.describe_ec2_subnets()210        collect_command.describe_ec2_security_groups()211        collect_command.describe_ec2_route_tables()212        collect_command.describe_ec2_network_interfaces()213        collect_command.describe_ec2_network_acls()214        collect_command.describe_ec2_describe_availability_zones()215        collect_command.describe_ec2_internet_gateway()216        collect_command.list_sfn()217        collect_command.describe_sfn()218        collect_command.list_waf_web_acl()219        collect_command.list_waf_tags()220        collect_command.list_waf_resource()221        collect_command.get_apigw_integration()222        collect_command.get_cloudwatch_cognito_event()223        collect_command.list_cognito_identity_pool()224        collect_command.get_s3_bucket_policy_status()225        collect_command.describe_cognito_identity_pools()226        collect_command.list_sns_tag()227        collect_command.list_sfn_tag()228        collect_command.list_ddb_tag()229        collect_command.list_s3_tag()230        collect_command.list_lambda_tag()231        collect_command.list_resource_group()232        collect_command.list_resource_group_resources()233        collect_command.list_resource_group_tag()234        collect_command.list_vpcs()235    # initialize a class236    cytoscape_node_data=[]237    cytoscape_edge_data=[]238    for i in range(len(regions)):239        prepare_command = Prepare(regions[i], account_id,240        cytoscape_node_data,cytoscape_edge_data)241        # prepare node logic242        prepare_command.prepare_lambda_node()243        prepare_command.prepare_s3_node()244        prepare_command.prepare_dynamodb_node()245        prepare_command.prepare_transcribe_node()246        prepare_command.prepare_translate_node()247        prepare_command.prepare_sns_node()248        prepare_command.prepare_sfn_node()249        prepare_command.prepare_apigw_node()250        prepare_command.prepare_cognito_node()251        prepare_command.prepare_rds_node()252        prepare_command.prepare_ec2_node()253        prepare_command.prepare_rg_node()254        prepare_command.prepare_waf_node()255        prepare_command.prepare_elbv2_node()256        prepare_command.prepare_vpc_node()257        prepare_command.prepare_subnet_node()258        # prepare edge logic259        prepare_command.find_sfn_connection()260        prepare_command.find_edge_lambda_to_sns()261        prepare_command.find_edge_lambda_to_ddb()262        prepare_command.find_edge_lambda_to_transcribe()263        prepare_command.find_edge_lambda_to_translate()264        prepare_command.find_edge_transcribe_to_s3()265        prepare_command.find_edge_s3_to_cognito()266        prepare_command.find_edge_lambda_to_sfn()267        prepare_command.find_edge_apigw_to_lambda()268        prepare_command.find_edge_sns_to_lambda()269        prepare_command.rg_find_connection()270        prepare_command.find_edge_waf_to_elb()271        prepare_command.find_edge_elb_to_ec2()272        # EXPORT INTO JSON FILE273        if(i == len(regions)-1):274            prepare_command.exportToJSON()275            prepare_command.export_JSON_to_S3(276                AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)277            prepare_command.write_to_dynamoDB(278                str(input['requestID']), AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)279    return {280        'status': 'Success',281        'code': 200,282        'message': 'Succeed updating metadata file'283    }284if __name__ == '__main__':...ct
Source:ct  
1#!/usr/bin/env python2import os3import glob4import sys5import argparse6import subprocess7import logging as log8from os.path import join, normpath, abspath, islink, split9from pprint import pformat, pprint10from itertools import dropwhile, product, takewhile11import operator12def get_options():13    maxres = 1014    debug = ''15    collect_command = 'git ls-files'16    find_collect_command = 'find . -type f'17    parser = argparse.ArgumentParser(description='command-t like shell command')18    parser.add_argument('target', metavar='target', help='the target pattern to search')19    parser.add_argument('-c', '--command', help='command to collect a list of files (default: git ls-files)')20    parser.add_argument('-f', '--find', action='store_true', help='use find collect command (find . -type f)')21    parser.add_argument('-m', '--max', type=int, help='stop at max results')22    parser.add_argument('-d', '--debug', type=str, help='filename expansion to debug')23    parser.add_argument('-v', '--verbose', action='store_true', help='verbose: print out debug information')24    options = parser.parse_args()25    log.basicConfig(level=log.DEBUG) if options.verbose else log.basicConfig(level=log.INFO)26    maxres = options.max if options.max else maxres27    debug = options.debug if options.debug else debug28    if options.find:29        collect_command = find_collect_command30    elif options.command:31        collect_command = options.command 32    return options.target, collect_command, maxres, debug33def call(command):34    return subprocess.Popen(command.split(),stdout=subprocess.PIPE, stderr=open(os.devnull, 'w')).communicate()35def contains_all(candidate, target):36    return len(list(dropwhile(lambda x: candidate.count(x) > 0, target))) == 037def filter_files(files, target):38    return [x for x in files if contains_all(x.lower(), target)]39def positions_of_char(char, s):40    return [p[0] for p in enumerate(s) if p[1] == char]41def min_diff(a,b):42    diffs = [p[1] - p[0] for p in product(a,b) if p[1] > p[0]]43    return min(diffs) if diffs else 100044def all_chars_positions_of(target, in_candidate):45    return list(takewhile(lambda x: x != None, [positions_of_char(c, in_candidate) for c in target]))46def calculate_min_diff(char_matrix):47    return reduce(operator.add, [min_diff(a,b) for a,b in zip(char_matrix, char_matrix[1:])])48def calculate_score(target, candidate):49    char_matrix = all_chars_positions_of(target,candidate)50    return calculate_min_diff(char_matrix) if char_matrix else 100051if __name__ == '__main__':52    target, collect_command, maxres, debug = get_options()53    if debug:54        log.debug('troubleshooting: %s' % debug.lower())55        score = calculate_score(target, debug.lower())56        log.debug('score: %s' % score)57        sys.exit(0)58    collected, exitcode = call(collect_command)59    if not collected:60        log.error('no results returned by %s' % collect_command)61        sys.exit(1)62    files = collected.split('\n')63    found = filter_files(files, target)64    result = []65    for f in found:66        score = calculate_score(target, f.lower())67        if score < 1000:68            result.append((score,f))69    log.debug('\n'.join([str(f) for f in sorted(result)[:maxres]]))...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
