Best Python code snippet using localstack_python
load_balancer.py
Source:load_balancer.py  
...44                },45            ],46            )47time.sleep(120)48existing_instances = ec2.describe_instances()49pub_ip={}50for i in (existing_instances["Reservations"]):51    if ("Tags" in list(i["Instances"][0].keys())):52        for tag in (i["Instances"][0]["Tags"]):53            if(tag["Value"]=="tirta"):54                status = i["Instances"][0]["State"]["Name"]55                if status == "running":56                    pub_ip[(i["Instances"][0]["InstanceId"])]= i["Instances"][0]["PublicIpAddress"]57print("As publics IP que estao rodando sao: ", pub_ip)58@app.route('/', defaults={'path': ''})59@app.route('/<path:path>')60def catch_all(path):61    global pub_ip62    ip = random.choice(list(pub_ip.values()))63    return redirect("http://" + ip + ":5000/" + path,code=307)64#https://www.saltycrane.com/blog/2008/09/simplistic-python-thread-example/65def timeout():66    print("Deu ruim... timeout")67    global id_atual68    global pub_ip69    global quant70    print("a id que falhou eh",id_atual)71    existing_instances = ec2.describe_instances()72    ec2.terminate_instances(InstanceIds=[id_atual])73    74    75    76    existing_instances = ec2.describe_instances()77    for i in range(len(existing_instances["Reservations"])):78        for tag in (existing_instances["Reservations"][i]["Instances"]):79            if(tag["InstanceId"]==id_atual):80                status=existing_instances["Reservations"][i]["Instances"][0]["State"]["Name"]81                while (status!="terminated"):82                    existing_instances = ec2.describe_instances()83                    status=existing_instances["Reservations"][i]["Instances"][0]["State"]["Name"]84                    time.sleep(8)85                    continue86    pub_ip={}87    existing_instances = ec2.describe_instances()88    for i in (existing_instances["Reservations"]):89        if ("Tags" in list(i["Instances"][0].keys())):90            for tag in (i["Instances"][0]["Tags"]):91                if(tag["Value"]=="tirta"):92                    status = i["Instances"][0]["State"]["Name"]93                    if status == "running":94                        pub_ip[(i["Instances"][0]["InstanceId"])]= i["Instances"][0]["PublicIpAddress"]95    if (len(list(pub_ip.values())))>int(quant):96        existing_instances = ec2.describe_instances()97        id_extra=random.choice(list(pub_ip.keys()))98        ec2.terminate_instances(InstanceIds=[id_extra])99        existing_instances = ec2.describe_instances()100        for i in range(len(existing_instances["Reservations"])):101            for tag in (existing_instances["Reservations"][i]["Instances"]):102                if(tag["InstanceId"]==id_atual):103                    status=existing_instances["Reservations"][i]["Instances"][0]["State"]["Name"]104                    while (status!="terminated"):105                        existing_instances = ec2.describe_instances()106                        status=existing_instances["Reservations"][i]["Instances"][0]["State"]["Name"]107                        time.sleep(8)108                        continue109    while(len(list(pub_ip.values()))<int(quant)):110        print("Creating Instance")111        ec2_service.create_instances(ImageId='ami-0ac019f4fcb7cb7e6', MinCount=1, MaxCount=1,112            InstanceType='t2.micro',113            KeyName='teste',114            SecurityGroups=['aps'],115            UserData="""#!/bin/bash116                    cd home/ubuntu/117                    git clone https://github.com/eduardotp1/projeto_cloud.git118                    sudo apt-get -y update119                    sudo apt-get install -y python3-pip120                    sudo pip3 install flask121                    sudo pip3 install flask_restful122                    sudo pip3 install boto3123                    cd projeto_cloud/124                    python3 app.py125                    """,126            TagSpecifications=[127                {128                    'ResourceType': 'instance',129                    'Tags': [130                        {131                            'Key': 'Owner',132                            'Value': 'tirta'133                        },134                    ]135                },136            ],137            )138        time.sleep(120)139        pub_ip={}140        existing_instances = ec2.describe_instances()141        for i in (existing_instances["Reservations"]):142            if ("Tags" in list(i["Instances"][0].keys())):143                for tag in (i["Instances"][0]["Tags"]):144                    if(tag["Value"]=="tirta"):145                        status = i["Instances"][0]["State"]["Name"]146                        if status == "running":147                            pub_ip[(i["Instances"][0]["InstanceId"])]= i["Instances"][0]["PublicIpAddress"]148    149    existing_instances = ec2.describe_instances()150    for i in (existing_instances["Reservations"]):151        if ("Tags" in list(i["Instances"][0].keys())):152            for tag in (i["Instances"][0]["Tags"]):153                if(tag["Value"]=="tirta"):154                    status = i["Instances"][0]["State"]["Name"]155                    if status == "running":156                        pub_ip[(i["Instances"][0]["InstanceId"])]= i["Instances"][0]["PublicIpAddress"]157    health()158if (len(list(pub_ip.values()))!=0):159    id_atual = list(pub_ip.values())[0]160def health():161    global pub_ip162    global id_atual163    global quant164    while(True):165        if (len(list(pub_ip.keys()))<int(quant)):166            print("Creating Instance")167            ec2_service.create_instances(ImageId='ami-0ac019f4fcb7cb7e6', MinCount=1, MaxCount=1,168                InstanceType='t2.micro',169                KeyName='teste',170                SecurityGroups=['aps'],171                UserData="""#!/bin/bash172                        cd home/ubuntu/173                        git clone https://github.com/eduardotp1/projeto_cloud.git174                        sudo apt-get -y update175                        sudo apt-get install -y python3-pip176                        sudo pip3 install flask177                        sudo pip3 install flask_restful178                        sudo pip3 install boto3179                        cd projeto_cloud/180                        python3 app.py181                        """,182                TagSpecifications=[183                    {184                        'ResourceType': 'instance',185                        'Tags': [186                            {187                                'Key': 'Owner',188                                'Value': 'tirta'189                            },190                        ]191                    },192                ],193                )194            time.sleep(120)195        elif (len(list(pub_ip.values()))>int(quant)):196            existing_instances = ec2.describe_instances()197            id_extra=random.choice(list(pub_ip.keys()))198            ec2.terminate_instances(InstanceIds=[id_extra])199            existing_instances = ec2.describe_instances()200            for i in range(len(existing_instances["Reservations"])):201                for tag in (existing_instances["Reservations"][i]["Instances"]):202                    if(tag["InstanceId"]==id_atual):203                        status=existing_instances["Reservations"][i]["Instances"][0]["State"]["Name"]204                        while (status!="terminated"):205                            existing_instances = ec2.describe_instances()206                            status=existing_instances["Reservations"][i]["Instances"][0]["State"]["Name"]207                            time.sleep(8)208                            continue209        pub_ip={}210        existing_instances = ec2.describe_instances()211        for i in (existing_instances["Reservations"]):212            if ("Tags" in list(i["Instances"][0].keys())):213                for tag in (i["Instances"][0]["Tags"]):214                    if(tag["Value"]=="tirta"):215                        status = i["Instances"][0]["State"]["Name"]216                        if status == "running":217                            pub_ip[(i["Instances"][0]["InstanceId"])]= i["Instances"][0]["PublicIpAddress"]218        for key, values in pub_ip.items():219            id_atual=key220            t = Timer(50.0,timeout)221            t.start()222            try:223                r=requests.get('http://' + values + ':5000/healthcheck')224            except:...test_aws.py
Source:test_aws.py  
1"""Test the AWS module"""2# pylint: disable=redefined-outer-name,no-self-use,unused-argument,protected-access,missing-docstring,invalid-name,line-too-long3from collections import namedtuple4import json5try:6    from unittest.mock import patch7except ImportError:8    from mock import patch9import pytest10from aws_ssh import aws, errors11SessionVars = namedtuple('SessionVars', 'session instance client describe_instances')12@pytest.fixture13def session_vars():14    with patch('boto3.session.Session') as session_mock:15        instance = session_mock.return_value16        client = instance.client17        describe_instances = client.return_value.describe_instances18        yield SessionVars(session_mock, instance, client, describe_instances)19def test_get_session():20    with patch('boto3.session.Session') as session_mock:21        aws.get_session('foobar')22        session_mock.assert_called_with(profile_name='foobar')23def test_get_instance_info(session_vars):24    session_vars.describe_instances.return_value = get_sample_response()25    info = aws.get_instance_info('foobar', 'test-', 'name')26    session_vars.session.assert_called_with(profile_name='foobar')27    session_vars.client.assert_called_with('ec2')28    assert session_vars.describe_instances.called29    assert info['PublicIpAddress'] == '52.90.39.59'30def test_get_instance_info_empty(session_vars):31    session_vars.describe_instances.return_value = get_sample_response(0)32    with pytest.raises(errors.NoInstanceFoundError):33        aws.get_instance_info('foobar', 'test-', 'name')34def test_get_instance_info_multiple(session_vars):35    session_vars.describe_instances.return_value = get_sample_response(2)36    with pytest.raises(errors.TooManyInstancesError):37        aws.get_instance_info('foobar', 'test-', 'name')38def get_sample_response(instance_count=1):39    response = {'Reservations': [{'Instances': []}]}40    if instance_count == 0:41        response['Reservations'] = []42        return response43    response['Reservations'][0]['Instances'] = [json.loads(SAMPLE_INSTANCE_BODY) for _ in range(instance_count)]44    return response45SAMPLE_INSTANCE_BODY = """46{"InstanceId": "i-0958008e", "ImageId": "ami-d05e75b8", "State": {"Code": 16, "Name": "running"}, "PrivateDnsName": "ip-10-0-0-186.ec2.internal", "PublicDnsName": "ec2-52.90.39.59.compute-1.amazonaws.com", "StateTransitionReason": "", "KeyName": "project", "AmiLaunchIndex": 0, "ProductCodes": [], "InstanceType": "m3.large", "LaunchTime": "2016-05-12T19:38:00.000Z", "Placement": {"AvailabilityZone": "us-east-1a", "GroupName": "", "Tenancy": "default"}, "Monitoring": {"State": "disabled"}, "SubnetId": "subnet-8dadbffa", "VpcId": "vpc-e821f18c", "PrivateIpAddress": "10.0.0.186", "PublicIpAddress": "52.90.39.59", "Architecture": "x86_64", "RootDeviceType": "ebs", "RootDeviceName": "/dev/sda1", "BlockDeviceMappings": [{"DeviceName": "/dev/sda1", "Ebs": {"VolumeId": "vol-3f4df594", "Status": "attached", "AttachTime": "2016-05-12T19:38:01.000Z", "DeleteOnTermination": true}}], "VirtualizationType": "hvm", "ClientToken": "project-Comput-1HKOY7086GV2G", "Tags": [{"Key": "POC", "Value": "Aru Sahni"}, {"Key": "Name", "Value": "project-compute"}, {"Key": "aws:cloudformation:logical-id", "Value": "ComputeInstance"}, {"Key": "Class", "Value": "project-compute"}, {"Key": "Project", "Value": "project"}, {"Key": "Stack", "Value": "project-cloudformation"}, {"Key": "aws:cloudformation:stack-id", "Value": "arn:aws:cloudformation:us-east-1:577354146450:stack/project/7ab2d640-66e6-11e5-955a-500150b34c7c"}, {"Key": "aws:cloudformation:stack-name", "Value": "project"}], "SecurityGroups": [{"GroupName": "project-SecurityGroupComputeWeb-W8OZK5PH7KRN", "GroupId": "sg-5457982f"}, {"GroupName": "project-SecurityGroupHGHQ-G86ETFB3FG4O", "GroupId": "sg-de9476b8"}], "SourceDestCheck": true, "Hypervisor": "xen", "NetworkInterfaces": [{"NetworkInterfaceId": "eni-800518c0", "SubnetId": "subnet-8dadbffa", "VpcId": "vpc-e821f18c", "Description": "Primary network interface", "OwnerId": "577354146450", "Status": "in-use", "MacAddress": "0a:1e:c9:33:d8:15", "PrivateIpAddress": "10.0.0.186", "PrivateDnsName": "ip-10-0-0-186.ec2.internal", "SourceDestCheck": true, "Groups": [{"GroupName": "project-SecurityGroupComputeWeb-W8OZK5PH7KRN", "GroupId": "sg-5457982f"}, {"GroupName": "project-SecurityGroupHGHQ-G86ETFB3FG4O", "GroupId": "sg-de9476b8"}], "Attachment": {"AttachmentId": "eni-attach-f61a4b0b", "DeviceIndex": 0, "Status": "attached", "AttachTime": "2016-05-12T19:38:00.000Z", "DeleteOnTermination": true}, "Association": {"PublicIp": "52.90.39.59", "PublicDnsName": "ec2-52.90.39.59.compute-1.amazonaws.com", "IpOwnerId": "amazon"}, "PrivateIpAddresses": [{"PrivateIpAddress": "10.0.0.186", "PrivateDnsName": "ip-10-0-0-186.ec2.internal", "Primary": true, "Association": {"PublicIp": "52.90.39.59", "PublicDnsName": "ec2-52.90.39.59.compute-1.amazonaws.com", "IpOwnerId": "amazon"}}], "Ipv6Addresses": []}], "IamInstanceProfile": {"Arn": "arn:aws:iam::577354146450:instance-profile/project-ComputeInstanceProfile-J4Q37C5EEWSI", "Id": "AIPAIGVRQT3L5JVKDWKCS"}, "EbsOptimized": false}...test_policy_gen.py
Source:test_policy_gen.py  
...15        stub.add_response('describe_images', {}, {})16        policy_gen = PolicyGenerator()17        policy_gen.record()18        stub.activate()19        ec2.describe_instances()20        ec2.describe_images()21        policy = json.loads(policy_gen.generate())22        assert 'Statement' in policy23        assert len(policy['Statement']) == 124        assert len(policy['Statement'][0]['Action']) == 225        assert 'ec2:DescribeInstances' in policy['Statement'][0]['Action']26        assert 'ec2:DescribeImages' in policy['Statement'][0]['Action']27    def test_records_policy_from_multiple_clients(self):28        ec2, ec2_stub = self.gen_client_and_stub('ec2')29        rds, rds_stub = self.gen_client_and_stub('rds')30        s3, s3_stub = self.gen_client_and_stub('s3')31        ec2_stub.add_response('describe_instances', {}, {})32        rds_stub.add_response('describe_db_instances', {}, {})33        s3_stub.add_response('list_buckets', {}, {})34        policy_gen = PolicyGenerator()35        policy_gen.record()36        ec2_stub.activate()37        rds_stub.activate()38        s3_stub.activate()39        ec2.describe_instances()40        rds.describe_db_instances()41        s3.list_buckets()42        policy = json.loads(policy_gen.generate())43        assert len(policy['Statement'][0]['Action']) == 344        assert 'ec2:DescribeInstances' in policy['Statement'][0]['Action']45        assert 's3:ListBuckets' in policy['Statement'][0]['Action']46        assert 'rds:DescribeDBInstances' in policy['Statement'][0]['Action']47    def test_actions_in_policy_are_unique(self):48        ec2, stub = self.gen_client_and_stub('ec2')49        for i in range(10):50            stub.add_response('describe_instances', {}, {})51        policy_gen = PolicyGenerator()52        policy_gen.record()53        stub.activate()54        for i in range(10):55            ec2.describe_instances()56        policy = json.loads(policy_gen.generate())57        assert len(policy['Statement'][0]['Action']) == 158        assert 'ec2:DescribeInstances' in policy['Statement'][0]['Action']59    def test_stub_can_be_activated_before_policy_gen(self):60        ec2, stub = self.gen_client_and_stub('ec2')61        stub.add_response('describe_instances', {}, {})62        stub.activate()63        policy_gen = PolicyGenerator()64        policy_gen.record()65        ec2.describe_instances()66        policy = json.loads(policy_gen.generate())67        assert len(policy['Statement'][0]['Action']) == 168        assert 'ec2:DescribeInstances' in policy['Statement'][0]['Action']69    @mock_ec270    def test_policy_is_recorded_when_not_stubbed(self):71        session = boto3.Session(region_name='eu-west-1')72        ec2 = session.client('ec2')73        policy_gen = PolicyGenerator()74        policy_gen.record()75        ec2.describe_instances()76        policy = json.loads(policy_gen.generate())77        assert len(policy['Statement'][0]['Action']) == 1...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
