Best Python code snippet using localstack_python
run-on-ec2.py
Source:run-on-ec2.py  
1import argparse2import json3import logging4import os5import threading6import uuid7from math import log8from time import time9from aws.AWSConfig import AwsConfig10from aws.ec2Manager import EC2Manager11from aws.s3Manager import S3Manager12def get_instance_configs(instance_ips, extra={}):13    port = AwsConfig.MPC_CONFIG.PORT14    num_faulty_nodes = AwsConfig.MPC_CONFIG.NUM_FAULTY_NODES15    instance_configs = [None] * len(instance_ips)16    for my_id in range(len(instance_ips)):17        config = {18            "N": AwsConfig.MPC_CONFIG.N,19            "t": AwsConfig.MPC_CONFIG.T,20            "my_id": my_id,21            "peers": [f"{ip}:{port}" for ip in instance_ips],22            "reconstruction": {"induce_faults": False},23            "skip_preprocessing": True,24            "extra": extra,25        }26        if num_faulty_nodes > 0:27            num_faulty_nodes -= 128            config["reconstruction"]["induce_faults"] = True29        instance_configs[my_id] = (my_id, json.dumps(config))30    return instance_configs31def run_commands_on_instances(32    ec2manager, commands_per_instance_list, verbose=True, output_file_prefix=None33):34    node_threads = [35        threading.Thread(36            target=ec2manager.execute_command_on_instance,37            args=[id, commands, verbose, output_file_prefix],38        )39        for id, commands in commands_per_instance_list40    ]41    for thread in node_threads:42        thread.start()43    for thread in node_threads:44        thread.join()45def get_ipc_setup_commands(s3manager, instance_ids):46    from honeybadgermpc.preprocessing import PreProcessedElements47    from honeybadgermpc.preprocessing import PreProcessingConstants as Constants48    n, t = AwsConfig.TOTAL_VM_COUNT, AwsConfig.MPC_CONFIG.T49    num_triples = AwsConfig.MPC_CONFIG.NUM_TRIPLES50    pp_elements = PreProcessedElements()51    pp_elements.generate_zeros(num_triples, n, t)52    pp_elements.generate_triples(num_triples, n, t)53    triple_urls = s3manager.upload_files(54        [55            pp_elements.mixins[Constants.TRIPLES]._build_file_name(n, t, i)56            for i in range(n)57        ]58    )59    zero_urls = s3manager.upload_files(60        [61            pp_elements.mixins[Constants.ZEROS]._build_file_name(n, t, i)62            for i in range(n)63        ]64    )65    setup_commands = [66        [67            instance_id,68            [69                "sudo docker pull %s" % (AwsConfig.DOCKER_IMAGE_PATH),70                "mkdir -p sharedata",71                "cd sharedata; curl -sSO %s" % (triple_urls[i]),72                "cd sharedata; curl -sSO %s" % (zero_urls[i]),73                "mkdir -p benchmark-logs",74            ],75        ]76        for i, instance_id in enumerate(instance_ids)77    ]78    return setup_commands79def get_hbavss_setup_commands(s3manager, instance_ids):80    setup_commands = [81        [82            instance_id,83            [84                "sudo docker pull %s" % (AwsConfig.DOCKER_IMAGE_PATH),85                "mkdir -p benchmark-logs",86            ],87        ]88        for i, instance_id in enumerate(instance_ids)89    ]90    return setup_commands91def get_butterfly_network_setup_commands(max_k, s3manager, instance_ids):92    from honeybadgermpc.preprocessing import PreProcessedElements93    from honeybadgermpc.preprocessing import PreProcessingConstants as Constants94    n, t = AwsConfig.TOTAL_VM_COUNT, AwsConfig.MPC_CONFIG.T95    k = max_k if max_k else AwsConfig.MPC_CONFIG.K96    logging.info("Starting to create preprocessing files.")97    stime = time()98    num_switches = k * int(log(k, 2)) ** 299    pp_elements = PreProcessedElements()100    pp_elements.generate_triples(2 * num_switches, n, t)101    pp_elements.generate_one_minus_ones(num_switches, n, t)102    pp_elements.generate_rands(k, n, t)103    logging.info(f"Preprocessing files created in {time()-stime}")104    logging.info("Uploading inputs to AWS S3.")105    stime = time()106    triple_urls = s3manager.upload_files(107        [108            pp_elements.mixins[Constants.TRIPLES]._build_file_name(n, t, i)109            for i in range(n)110        ]111    )112    input_urls = s3manager.upload_files(113        [114            pp_elements.mixins[Constants.RANDS]._build_file_name(n, t, i)115            for i in range(n)116        ]117    )118    rand_share_urls = s3manager.upload_files(119        [120            pp_elements.mixins[Constants.ONE_MINUS_ONE]._build_file_name(n, t, i)121            for i in range(n)122        ]123    )124    logging.info(f"Inputs successfully uploaded in {time()-stime} seconds.")125    setup_commands = [126        [127            instance_id,128            [129                "sudo docker pull %s" % (AwsConfig.DOCKER_IMAGE_PATH),130                "mkdir -p sharedata",131                "cd sharedata; curl -sSO %s" % (triple_urls[i]),132                "cd sharedata; curl -sSO %s" % (rand_share_urls[i]),133                "cd sharedata; curl -sSO %s" % (input_urls[i]),134                "mkdir -p benchmark-logs",135            ],136        ]137        for i, instance_id in enumerate(instance_ids)138    ]139    return setup_commands140def get_powermixing_setup_commands(max_k, runid, s3manager, instance_ids):141    from honeybadgermpc.preprocessing import PreProcessedElements142    from honeybadgermpc.preprocessing import PreProcessingConstants as Constants143    n, t = AwsConfig.TOTAL_VM_COUNT, AwsConfig.MPC_CONFIG.T144    k = max_k if max_k else AwsConfig.MPC_CONFIG.K145    logging.info("Starting to create preprocessing files.")146    stime = time()147    pp_elements = PreProcessedElements()148    pp_elements.generate_powers(k, n, t, k)149    pp_elements.generate_rands(k, n, t)150    logging.info(f"Preprocessing files created in {time()-stime}")151    setup_commands = []152    total_time = 0153    logging.info(f"Uploading input files to AWS S3.")154    for i, instance_id in enumerate(instance_ids):155        url = s3manager.upload_file(f"aws/download_input.sh")156        commands = [157            "sudo docker pull %s" % (AwsConfig.DOCKER_IMAGE_PATH),158            f"curl -sSO {url}",159            "mkdir -p sharedata",160            "cp download_input.sh sharedata/download_input.sh ",161            "mkdir -p benchmark-logs",162            "ulimit -n 10000",163        ]164        file_names = []165        for j in range(k):166            prefix1 = f"{pp_elements.mixins[Constants.POWERS].file_prefix}_{j}"167            file_names.append(168                pp_elements.mixins[Constants.POWERS].build_filename(169                    n, t, i, prefix=prefix1170                )171            )172            file_names.append(173                pp_elements.mixins[Constants.RANDS].build_filename(n, t, i)174            )175        stime = time()176        urls = s3manager.upload_files(file_names)177        total_time += time() - stime178        with open("%s-%d-links" % (runid, i), "w") as f:179            for url in urls:180                print(url, file=f)181        fname = f"{runid}-{i}-links"182        url = s3manager.upload_file(fname)183        commands.append(184            f"cd sharedata; curl -sSO {url}; bash download_input.sh {fname}"185        )186        setup_commands.append([instance_id, commands])187    logging.info(f"Upload completed in {total_time} seconds.")188    return setup_commands189def trigger_run(run_id, skip_setup, max_k, only_setup, cleanup):190    os.makedirs("sharedata/", exist_ok=True)191    logging.info(f"Run Id: {run_id}")192    ec2manager, s3manager = EC2Manager(), S3Manager(run_id)193    instance_ids, instance_ips = ec2manager.create_instances()194    if cleanup:195        instance_commands = [196            [instance_id, ["sudo docker kill $(sudo docker ps -q); rm -rf *"]]197            for i, instance_id in enumerate(instance_ids)198        ]199        run_commands_on_instances(ec2manager, instance_commands)200        return201    port = AwsConfig.MPC_CONFIG.PORT202    if AwsConfig.MPC_CONFIG.COMMAND.endswith("ipc"):203        instance_configs = get_instance_configs(instance_ips)204    elif AwsConfig.MPC_CONFIG.COMMAND.endswith("powermixing"):205        instance_configs = get_instance_configs(206            instance_ips, {"k": AwsConfig.MPC_CONFIG.K, "run_id": run_id}207        )208    elif AwsConfig.MPC_CONFIG.COMMAND.endswith("butterfly_network"):209        instance_configs = get_instance_configs(210            instance_ips, {"k": AwsConfig.MPC_CONFIG.K, "run_id": run_id}211        )212    elif AwsConfig.MPC_CONFIG.COMMAND.endswith("hbavss_batch"):213        instance_configs = get_instance_configs(214            instance_ips, {"k": AwsConfig.MPC_CONFIG.K, "run_id": run_id}215        )216    elif AwsConfig.MPC_CONFIG.COMMAND.endswith("hbavss_light"):217        instance_configs = get_instance_configs(218            instance_ips, {"k": AwsConfig.MPC_CONFIG.K, "run_id": run_id}219        )220    else:221        logging.error("Application not supported to run on AWS.")222        raise SystemError223    logging.info(f"Uploading config file to S3 in '{AwsConfig.BUCKET_NAME}' bucket.")224    config_urls = s3manager.upload_configs(instance_configs)225    logging.info("Config file upload complete.")226    logging.info("Triggering config update on instances.")227    config_update_commands = [228        [instance_id, ["mkdir -p config", "cd config; curl -sSO %s" % (config_url)]]229        for config_url, instance_id in zip(config_urls, instance_ids)230    ]231    run_commands_on_instances(ec2manager, config_update_commands, False)232    logging.info("Config update completed successfully.")233    if not skip_setup:234        if AwsConfig.MPC_CONFIG.COMMAND.endswith("ipc"):235            setup_commands = get_ipc_setup_commands(s3manager, instance_ids)236        elif AwsConfig.MPC_CONFIG.COMMAND.endswith("powermixing"):237            setup_commands = get_powermixing_setup_commands(238                max_k, run_id, s3manager, instance_ids239            )240        elif AwsConfig.MPC_CONFIG.COMMAND.endswith("butterfly_network"):241            setup_commands = get_butterfly_network_setup_commands(242                max_k, s3manager, instance_ids243            )244        elif AwsConfig.MPC_CONFIG.COMMAND.endswith("hbavss_batch"):245            setup_commands = get_hbavss_setup_commands(s3manager, instance_ids)246        elif AwsConfig.MPC_CONFIG.COMMAND.endswith("hbavss_light"):247            setup_commands = get_hbavss_setup_commands(s3manager, instance_ids)248        logging.info("Triggering setup commands.")249        run_commands_on_instances(ec2manager, setup_commands, False)250    if not only_setup:251        logging.info("Setup commands executed successfully.")252        instance_commands = [253            [254                instance_id,255                [256                    f"sudo docker run\257                -p {port}:{port} \258                -v /home/ubuntu/config:/usr/src/HoneyBadgerMPC/config/ \259                -v /home/ubuntu/sharedata:/usr/src/HoneyBadgerMPC/sharedata/ \260                -v /home/ubuntu/benchmark-logs:/usr/src/HoneyBadgerMPC/benchmark-logs/ \261                {AwsConfig.DOCKER_IMAGE_PATH} \262                {AwsConfig.MPC_CONFIG.COMMAND} -d -f config/config-{i}.json"263                ],264            ]265            for i, instance_id in enumerate(instance_ids)266        ]267        logging.info("Triggering MPC commands.")268        run_commands_on_instances(ec2manager, instance_commands)269        logging.info("Collecting logs.")270        log_collection_cmds = [271            [id, ["cat benchmark-logs/*.log"]] for id in instance_ids272        ]273        os.makedirs(run_id, exist_ok=True)274        run_commands_on_instances(275            ec2manager, log_collection_cmds, True, f"{run_id}/benchmark-logs"276        )277    s3manager.cleanup()278if __name__ == "__main__":279    parser = argparse.ArgumentParser(description="Runs HBMPC code on AWS.")280    parser.add_argument(281        "-s",282        "--skip-setup",283        dest="skip_setup",284        action="store_true",285        help="If this is passed, then the setup commands are skipped.",286    )287    parser.add_argument(288        "-c",289        "--cleanup",290        dest="cleanup",291        action="store_true",292        help="This kills all running containers and deletes all stored files.",293    )294    parser.add_argument(295        "-k",296        "--max-k",297        default=AwsConfig.MPC_CONFIG.K,298        type=int,299        dest="max_k",300        help="Maximum value of k for which the inputs need to be \301        created and uploaded during the setup phase. This value is \302        ignored if --skip-setup is passed. (default: `k` in aws_config.json)",303    )304    parser.add_argument(305        "--only-setup",306        dest="only_setup",307        action="store_true",308        help="If this value is passed, then only the setup phase is run,\309         otherwise both phases are run.",310    )311    parser.add_argument(312        "--run-id",313        dest="run_id",314        nargs="?",315        help="If skip setup is passed, then a previous run_id for the same\316        MPC application needs to be specified to pickup the correct input files.",317    )318    args = parser.parse_args()319    if args.skip_setup and args.only_setup:320        parser.error("--only-setup and --skip-setup are mutually exclusive.")321    if args.skip_setup and not args.run_id:322        parser.error("--run-id needs to be passed with --skip-setup.")323    args.run_id = uuid.uuid4().hex if args.run_id is None else args.run_id...aws_wrapper.py
Source:aws_wrapper.py  
1import boto32from botocore.exceptions import  ClientError3import logging4from json import loads, dumps5from typing import List, Any, Dict, Callable,Tuple, Iterable6from uuid import uuid47from datetime import datetime8logger = logging.getLogger("aws")9#AwsConfig = Config.RawConfig['aws']10logging.getLogger('botocore').setLevel(logging.ERROR)11logging.getLogger('urllib3').setLevel(logging.ERROR)12def getClient(awsconfig):13    #if awsconfig['use_system_credentials']:14    #    return boto3.client('sqs')15    #else:16        return boto3.client('sqs', aws_access_key_id=awsconfig['key_id'], aws_secret_access_key=awsconfig['access_key'],17                 region_name=awsconfig['region'])18def read_queue(awsconfig, callback = None):19    maxMessages = 10 #0-10, limited by AWS20    sqs = getClient(awsconfig)21    fullQueue = True22    totalRead = 023    now = datetime.now()24    while fullQueue:25        if (datetime.now() - now).total_seconds() > 45: #don't stay in this loop for too long26            break27        response = sqs.receive_message(QueueUrl=awsconfig['incoming'], AttributeNames=['SentTimestamp'],28                                       MaxNumberOfMessages=maxMessages,29                                       MessageAttributeNames=['All'],30                                        VisibilityTimeout=20,31                                        WaitTimeSeconds=5)32        if 'Messages' in response:33            numMessages = len(response['Messages'])34            if numMessages < maxMessages:35                fullQueue = False36            for m in response['Messages']:37                if callback is not None:38                    try:39                        bodyjson = loads(m['Body'])40                    except Exception as lse:41                        escaped = m['Body'].replace("\n","\\n")42                        logging.error(f"Got an invalid SQS message! Deleting it. Message: {escaped}. Stack trace: {lse}")43                        sqs.delete_message(QueueUrl=awsconfig['incoming'], ReceiptHandle=m['ReceiptHandle'])44                        continue45                    try:46                        clearIt = callback(bodyjson)47                    except Exception as cbe:48                        escaped = m['Body'].replace("\n", "\\n")49                        logging.error(f"Failure calling message callback on SQS queue. Message: {escaped}. Stack trace: {cbe}")50                        sqs.delete_message(QueueUrl=awsconfig['incoming'], ReceiptHandle=m['ReceiptHandle'])51                        continue52                    if clearIt:53                        sqs.delete_message(QueueUrl=awsconfig['incoming'], ReceiptHandle=m['ReceiptHandle'])54                else:55                    logging.warning(f'Got SQS Message With No Callback!: {m["Body"]}')56            totalRead += numMessages57        else:58            fullQueue = False59    if totalRead > 0:60        logger.debug(f"Read {len(response['Messages'])} message(s) from queue!")61    else:62        logging.debug("No messages in response")63def write_queue(messages: Iterable[Tuple[Any, Any]], awsconfig) -> Tuple[List[Any],List[Any]]:64    if Callable is None:65        return66    sqs = getClient(awsconfig)67    succeeded = []68    failed = []69    for (item, mapping) in messages:70        try:71            sqs.send_message(QueueUrl=awsconfig['outgoing'],72                         MessageBody=dumps(mapping),73                         MessageGroupId='hostname', #TODO74                         MessageDeduplicationId=str(uuid4()))75            succeeded.append(item)76        except ClientError as clienterror:77            logging.error(f"Failed to send message: {clienterror}")78            failed.append(item)79        except Exception as ex:80            logging.warning(f"Failed to send messages: {ex}")81            failed.append(item)82            pass83    return (succeeded,failed)84def TestAws(awsconfig : Dict[str,str]):85    sqs = boto3.client('sqs', aws_access_key_id=awsconfig['key_id'], aws_secret_access_key=awsconfig['access_key'],86                       region_name=awsconfig['region'])87    response = sqs.receive_message(QueueUrl=awsconfig['incoming'], AttributeNames=['SentTimestamp'],88                                   MaxNumberOfMessages=1,89                                   MessageAttributeNames=['All'],90                                   VisibilityTimeout=5,91                                   WaitTimeSeconds=1)92    response = sqs.receive_message(QueueUrl=awsconfig['outgoing'], AttributeNames=['SentTimestamp'],93                                       MaxNumberOfMessages=1,94                                       MessageAttributeNames=['All'],95                                       VisibilityTimeout=5,96                                       WaitTimeSeconds=1)97#def cb(body):98#    print(body)99#    pass100#read_queue(callback=cb)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
