Best Python code snippet using localstack_python
alcor_test_setup.py
Source:alcor_test_setup.py  
1#!/usr/bin/env python32import requests3import json4from time import sleep5def main():6    address = "10.213.43.224"7    project_id = "123456789"8    router_id = "11112801-d675-4688-a63f-dcda8d327f50"9    ncm_port = "30007"10    sgm_port = "30008"11    vpm_port = "30001"12    sm_port = "30002"13    router_port = "30003"14    headers = {'Content-Type': 'application/json'}15    create_node_ncm_endpoint = "http://{}:{}/ncms".format(16        address, ncm_port)17    create_sg_endpoint = "http://{}:{}/project/{}/security-groups".format(18        address, sgm_port, project_id)19    create_subnet_endpoint = "http://{}:{}/project/{}/subnets".format(20        address, sm_port, project_id)21    create_vpc_endpoint = "http://{}:{}/project/{}/vpcs".format(22         address, vpm_port, project_id)23    create_router_endpoint = "http://{}:{}/project/{}/routers".format(24         address, router_port, project_id)25    attach_router_endpoint = "http://{}:{}/project/{}/routers/{}/add_router_interface".format(26         address, router_port, project_id, router_id)27    # print("###############REGISTER NODE NCM###############")28    # ncm_body = {29    #     "ncm_info": {30    #         "cap": 1,31    #         "id": "ncm_id_1",32    #         "uri": "ncm_uri_1"33    #     }34    # }35    # response = requests.post(create_node_ncm_endpoint,36    #                          headers=headers,37    #                          verify=False,38    #                          data=json.dumps(ncm_body))39    # print("create_ncm response {}".format(response.text))40    # while not response.ok:41    #     sleep(5)42    #     response = requests.get(create_node_ncm_endpoint)43    print("###############CREATE VPC###############")44    network_body = {45        "network": {46            "admin_state_up": True,47            "revision_number": 0,48            "cidr": "10.0.0.0/16",49            "default": True,50            "description": "vpc",51            "dns_domain": "domain",52            "id": "9192a4d4-ffff-4ece-b3f0-8d36e3d88001",53            "is_default": True,54            "mtu": 1400,55            "name": "sample_vpc",56            "port_security_enabled": True,57            "project_id": "123456789"58        }59    }60    response = requests.post(create_vpc_endpoint,61                             headers=headers,62                             verify=False,63                             data=json.dumps(network_body))64    print("create_vpc response {}".format(response.text))65    while not response.ok:66        sleep(5)67        print("create_vpc response {}".format(response.text))68        response = requests.get(create_vpc_endpoint)69    print("###############CREATE SG###############")70    sg_body = {71        "security_group": {72            "create_at": "string",73            "description": "string",74            "id": "3dda2801-d675-4688-a63f-dcda8d111111",75            "name": "sg1",76            "project_id": "123456789",77            "security_group_rules": [78            ],79            "tenant_id": "123456789",80            "update_at": "string"81        }82    }83    response = requests.post(create_sg_endpoint,84                             headers=headers,85                             verify=False,86                             data=json.dumps(sg_body))87    print("create_sg response {}".format(response.text))88    while not response.ok:89        sleep(5)90        print("create_sg response {}".format(response.text))91        response = requests.get(create_sg_endpoint)92    print("###############CREATE SUBNET1###############")93    subnet_body = {94        "subnet":95        {96            "cidr": "10.0.1.0/24",97            "id": "8182a4d4-ffff-4ece-b3f0-8d36e3d88001",98            "ip_version": 4,99            "network_id": "9192a4d4-ffff-4ece-b3f0-8d36e3d88001",100            "name": "subnet1"101        }102    }103    response = requests.post(create_subnet_endpoint,104                             headers=headers,105                             verify=False,106                             data=json.dumps(subnet_body))107    print("create_subnet response {}".format(response.text))108    while not response.ok:109        sleep(5)110        print("create_subnet response {}".format(response.text))111        response = requests.get(create_subnet_endpoint)112    print("###############CREATE SUBNET2###############")113    subnet_body = {114        "subnet":115        {116            "cidr": "10.0.2.0/24",117            "id": "8182a4d4-ffff-4ece-b3f0-8d36e3d88002",118            "ip_version": 4,119            "network_id": "9192a4d4-ffff-4ece-b3f0-8d36e3d88001",120            "name": "subnet2"121        }122    }123    response = requests.post(create_subnet_endpoint,124                             headers=headers,125                             verify=False,126                             data=json.dumps(subnet_body))127    print("create_subnet response {}".format(response.text))128    while not response.ok:129        sleep(5)130        print("create_subnet response {}".format(response.text))131        response = requests.get(create_subnet_endpoint)132    print("###############CREATE SUBNET3###############")133    subnet_body = {134        "subnet":135        {136            "cidr": "10.0.3.0/24",137            "id": "8182a4d4-ffff-4ece-b3f0-8d36e3d88003",138            "ip_version": 4,139            "network_id": "9192a4d4-ffff-4ece-b3f0-8d36e3d88001",140            "name": "subnet2"141        }142    }143    response = requests.post(create_subnet_endpoint,144                             headers=headers,145                             verify=False,146                             data=json.dumps(subnet_body))147    print("create_subnet response {}".format(response.text))148    while not response.ok:149        sleep(5)150        print("create_subnet response {}".format(response.text))151        response = requests.get(create_subnet_endpoint)152    print("###############CREATE SUBNET4###############")153    subnet_body = {154        "subnet":155        {156            "cidr": "10.0.4.0/24",157            "id": "8182a4d4-ffff-4ece-b3f0-8d36e3d88004",158            "ip_version": 4,159            "network_id": "9192a4d4-ffff-4ece-b3f0-8d36e3d88001",160            "name": "subnet2"161        }162    }163    response = requests.post(create_subnet_endpoint,164                             headers=headers,165                             verify=False,166                             data=json.dumps(subnet_body))167    print("create_subnet response {}".format(response.text))168    while not response.ok:169        sleep(5)170        print("create_subnet response {}".format(response.text))171        response = requests.get(create_subnet_endpoint)172    print("###############CREATE SUBNET5###############")173    subnet_body = {174        "subnet":175        {176            "cidr": "10.0.5.0/24",177            "id": "8182a4d4-ffff-4ece-b3f0-8d36e3d88005",178            "ip_version": 4,179            "network_id": "9192a4d4-ffff-4ece-b3f0-8d36e3d88001",180            "name": "subnet2"181        }182    }183    response = requests.post(create_subnet_endpoint,184                             headers=headers,185                             verify=False,186                             data=json.dumps(subnet_body))187    print("create_subnet response {}".format(response.text))188    while not response.ok:189        sleep(5)190        print("create_subnet response {}".format(response.text))191        response = requests.get(create_subnet_endpoint)192    print("###############CREATE ROUTER###############")193    router_body = {194        "router": {195            "admin_state_up": True,196            "availability_zone_hints": [197            "string"198            ],199            "availability_zones": [200            "string"201            ],202            "conntrack_helpers": [203            "string"204            ],205            "description": "string",206            "distributed": True,207            "external_gateway_info": {208            "enable_snat": True,209            "external_fixed_ips": [],210            "network_id": "9192a4d4-ffff-4ece-b3f0-8d36e3d88001"211            },212            "flavor_id": "string",213            "gateway_ports": [214            ],215            "ha": True,216            "id": "11112801-d675-4688-a63f-dcda8d327f50",217            "name": "router1",218            "owner": "string",219            "project_id": "123456789",220            "revision_number": 0,221            "routetable": {},222            "service_type_id": "string",223            "status": "BUILD",224            "tags": [225            "string"226            ],227            "tenant_id": "123456789"228        }229    }230    response = requests.post(create_router_endpoint,231                             headers=headers,232                             verify=False,233                             data=json.dumps(router_body))234    print("create_router response {}".format(response.text))235    while not response.ok:236        sleep(5)237        print("create_router response {}".format(response.text))238        response = requests.get(create_router_endpoint)239    print("###############ATTACH SUBNET1 TO ROUTER###############")240    attachRouter_body = {241        "subnet_id": "8182a4d4-ffff-4ece-b3f0-8d36e3d88001"242    }243    response = requests.put(attach_router_endpoint,244                             headers=headers,245                             verify=False,246                             data=json.dumps(attachRouter_body))247    print("attachRouter response {}".format(response.text))248    while not response.ok:249        sleep(5)250        print("attachRouter response {}".format(response.text))251        response = requests.get(attach_router_endpoint)252    print("###############ATTACH SUBNET2 TO ROUTER###############")253    attachRouter_body = {254        "subnet_id": "8182a4d4-ffff-4ece-b3f0-8d36e3d88002"255    }256    response = requests.put(attach_router_endpoint,257                             headers=headers,258                             verify=False,259                             data=json.dumps(attachRouter_body))260    print("attachRouter response {}".format(response.text))261    while not response.ok:262        sleep(5)263        print("attachRouter response {}".format(response.text))264        response = requests.get(attach_router_endpoint)265    print("###############ATTACH SUBNET3 TO ROUTER###############")266    attachRouter_body = {267        "subnet_id": "8182a4d4-ffff-4ece-b3f0-8d36e3d88003"268    }269    response = requests.put(attach_router_endpoint,270                             headers=headers,271                             verify=False,272                             data=json.dumps(attachRouter_body))273    print("attachRouter response {}".format(response.text))274    while not response.ok:275        sleep(5)276        print("attachRouter response {}".format(response.text))277        response = requests.get(attach_router_endpoint)278    print("###############ATTACH SUBNET4 TO ROUTER###############")279    attachRouter_body = {280        "subnet_id": "8182a4d4-ffff-4ece-b3f0-8d36e3d88004"281    }282    response = requests.put(attach_router_endpoint,283                             headers=headers,284                             verify=False,285                             data=json.dumps(attachRouter_body))286    print("attachRouter response {}".format(response.text))287    while not response.ok:288        sleep(5)289        print("attachRouter response {}".format(response.text))290        response = requests.get(attach_router_endpoint)291    print("###############ATTACH SUBNET5 TO ROUTER###############")292    attachRouter_body = {293        "subnet_id": "8182a4d4-ffff-4ece-b3f0-8d36e3d88005"294    }295    response = requests.put(attach_router_endpoint,296                             headers=headers,297                             verify=False,298                             data=json.dumps(attachRouter_body))299    print("attachRouter response {}".format(response.text))300    while not response.ok:301        sleep(5)302        print("attachRouter response {}".format(response.text))303        response = requests.get(attach_router_endpoint)...s3_vpc_endpoint.py
Source:s3_vpc_endpoint.py  
...57            if (vpc_endpoint.vpc_id == vpc_id and58                    vpc_endpoint.route_tables[0] == route_table_id):59                return vpc_endpoint60        return None61    def create_vpc_endpoint(self, vpc_id, route_table_id, service_name):62        """Creates a VPC endpoint in the VPC and ties it with a route table"""63        params = {'VpcId': vpc_id, 'RouteTableId.1': route_table_id,64                  'ServiceName': service_name}65        return self.get_object('CreateVpcEndpoint', params, VPCEndpoint,66                               verb='POST')67class S3VPCEndpoint(CustomActionNode):68    """Represents a VPC endpoint for Amazon S3"""69    INPUTS = {70        'Region': ['global:Region'],71        'VpcId': ['global:VpcId', 'VPC:VpcId'],72        'RouteTableId': ['global:RouteTableId', 'VPC:RouteTableId'],73        'StackType': ['global:StackType'],74    }75    DEFAULTS = {76        'Region': 'us-east-1',77        'StackType': 'Staging',78    }79    ATTRIBUTES = {'StackType': 'StackType'}80    def action(self):81        region = self.get_input('Region')82        vpc_id = self.get_input('VpcId')83        route_table_id = self.get_input('RouteTableId')84        conn = CustomVPCConnection(region=region,85                                   profile_name=self.aws_profile)86        vpc_endpoint = conn.get_vpc_endpoint(vpc_id, route_table_id)87        if vpc_endpoint is None:88            conn.create_vpc_endpoint(vpc_id, route_table_id,89                                     self.get_service_name(region))90    def get_service_name(self, region):...create_vpc_endpoints.py
Source:create_vpc_endpoints.py  
...30                ToPort=443,31                FromPort=443,32            )33    return security_group34def create_vpc_endpoint(35    vpc_id: str, service_name: str, subnets: list36) -> list:37    result = []38    if is_vpc_endpoint_exists(vpc_id=vpc_id, service_name=service_name):39        print(40            f"Endpoint {service_name} already exists in VPC {vpc_id}. Skip endpoint creation."41        )42        return43    if subnets:44        security_group = create_endpoint_security_group(45            vpc_id, subnets, service_name46        )47        result.append({48            "Action": "Created security group",49            "Result": security_group50        })51        vpc_endpoint = ec2.create_vpc_endpoint(52            VpcEndpointType="Interface",53            VpcId=vpc_id,54            ServiceName=service_name,55            SubnetIds=subnets,56            SecurityGroupIds=[security_group["GroupId"]],57        )58        result.append({59            "Action": "Created VPC endpoint",60            "Result": vpc_endpoint["VpcEndpoint"]61        })62    return result63if __name__ == "__main__":64    parser = argparse.ArgumentParser(65        description="Create VPC endpoint for private subnets with NAT Gateway configured."66    )67    parser.add_argument(68        "--service-name",69        type=str,70        help="The name of AWS service to create a VPC endpoint for (for example, com.amazonaws.us-east-1.kms)",71        required=True72    )73    parser.add_argument('infile', nargs='?', type=argparse.FileType('r'), default=sys.stdin)74    args = parser.parse_args()75    input = json.load(sys.stdin)76    actions = []77    for vpc in input["VPC"]:78        if "VpcId" in vpc and "Subnets" in vpc:79            action = create_vpc_endpoint(80                vpc_id=vpc["VpcId"],81                service_name=args.service_name,82                subnets=vpc["Subnets"]83            )84            actions += action...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
