How to use create_vpc_endpoint method in localstack

Best Python code snippet using localstack_python

alcor_test_setup.py

Source:alcor_test_setup.py Github

copy

Full Screen

1#!/usr/bin/env python32import requests3import json4from time import sleep5def main():6 address = "10.213.43.224"7 project_id = "123456789"8 router_id = "11112801-d675-4688-a63f-dcda8d327f50"9 ncm_port = "30007"10 sgm_port = "30008"11 vpm_port = "30001"12 sm_port = "30002"13 router_port = "30003"14 headers = {'Content-Type': 'application/json'}15 create_node_ncm_endpoint = "http://{}:{}/ncms".format(16 address, ncm_port)17 create_sg_endpoint = "http://{}:{}/project/{}/security-groups".format(18 address, sgm_port, project_id)19 create_subnet_endpoint = "http://{}:{}/project/{}/subnets".format(20 address, sm_port, project_id)21 create_vpc_endpoint = "http://{}:{}/project/{}/vpcs".format(22 address, vpm_port, project_id)23 create_router_endpoint = "http://{}:{}/project/{}/routers".format(24 address, router_port, project_id)25 attach_router_endpoint = "http://{}:{}/project/{}/routers/{}/add_router_interface".format(26 address, router_port, project_id, router_id)27 # print("###############REGISTER NODE NCM###############")28 # ncm_body = {29 # "ncm_info": {30 # "cap": 1,31 # "id": "ncm_id_1",32 # "uri": "ncm_uri_1"33 # }34 # }35 # response = requests.post(create_node_ncm_endpoint,36 # headers=headers,37 # verify=False,38 # data=json.dumps(ncm_body))39 # print("create_ncm response {}".format(response.text))40 # while not response.ok:41 # sleep(5)42 # response = requests.get(create_node_ncm_endpoint)43 print("###############CREATE VPC###############")44 network_body = {45 "network": {46 "admin_state_up": True,47 "revision_number": 0,48 "cidr": "10.0.0.0/16",49 "default": True,50 "description": "vpc",51 "dns_domain": "domain",52 "id": "9192a4d4-ffff-4ece-b3f0-8d36e3d88001",53 "is_default": True,54 "mtu": 1400,55 "name": "sample_vpc",56 "port_security_enabled": True,57 "project_id": "123456789"58 }59 }60 response = requests.post(create_vpc_endpoint,61 headers=headers,62 verify=False,63 data=json.dumps(network_body))64 print("create_vpc response {}".format(response.text))65 while not response.ok:66 sleep(5)67 print("create_vpc response {}".format(response.text))68 response = requests.get(create_vpc_endpoint)69 print("###############CREATE SG###############")70 sg_body = {71 "security_group": {72 "create_at": "string",73 "description": "string",74 "id": "3dda2801-d675-4688-a63f-dcda8d111111",75 "name": "sg1",76 "project_id": "123456789",77 "security_group_rules": [78 ],79 "tenant_id": "123456789",80 "update_at": "string"81 }82 }83 response = requests.post(create_sg_endpoint,84 headers=headers,85 verify=False,86 data=json.dumps(sg_body))87 print("create_sg response {}".format(response.text))88 while not response.ok:89 sleep(5)90 print("create_sg response {}".format(response.text))91 response = requests.get(create_sg_endpoint)92 print("###############CREATE SUBNET1###############")93 subnet_body = {94 "subnet":95 {96 "cidr": "10.0.1.0/24",97 "id": "8182a4d4-ffff-4ece-b3f0-8d36e3d88001",98 "ip_version": 4,99 "network_id": "9192a4d4-ffff-4ece-b3f0-8d36e3d88001",100 "name": "subnet1"101 }102 }103 response = requests.post(create_subnet_endpoint,104 headers=headers,105 verify=False,106 data=json.dumps(subnet_body))107 print("create_subnet response {}".format(response.text))108 while not response.ok:109 sleep(5)110 print("create_subnet response {}".format(response.text))111 response = requests.get(create_subnet_endpoint)112 print("###############CREATE SUBNET2###############")113 subnet_body = {114 "subnet":115 {116 "cidr": "10.0.2.0/24",117 "id": "8182a4d4-ffff-4ece-b3f0-8d36e3d88002",118 "ip_version": 4,119 "network_id": "9192a4d4-ffff-4ece-b3f0-8d36e3d88001",120 "name": "subnet2"121 }122 }123 response = requests.post(create_subnet_endpoint,124 headers=headers,125 verify=False,126 data=json.dumps(subnet_body))127 print("create_subnet response {}".format(response.text))128 while not response.ok:129 sleep(5)130 print("create_subnet response {}".format(response.text))131 response = requests.get(create_subnet_endpoint)132 print("###############CREATE SUBNET3###############")133 subnet_body = {134 "subnet":135 {136 "cidr": "10.0.3.0/24",137 "id": "8182a4d4-ffff-4ece-b3f0-8d36e3d88003",138 "ip_version": 4,139 "network_id": "9192a4d4-ffff-4ece-b3f0-8d36e3d88001",140 "name": "subnet2"141 }142 }143 response = requests.post(create_subnet_endpoint,144 headers=headers,145 verify=False,146 data=json.dumps(subnet_body))147 print("create_subnet response {}".format(response.text))148 while not response.ok:149 sleep(5)150 print("create_subnet response {}".format(response.text))151 response = requests.get(create_subnet_endpoint)152 print("###############CREATE SUBNET4###############")153 subnet_body = {154 "subnet":155 {156 "cidr": "10.0.4.0/24",157 "id": "8182a4d4-ffff-4ece-b3f0-8d36e3d88004",158 "ip_version": 4,159 "network_id": "9192a4d4-ffff-4ece-b3f0-8d36e3d88001",160 "name": "subnet2"161 }162 }163 response = requests.post(create_subnet_endpoint,164 headers=headers,165 verify=False,166 data=json.dumps(subnet_body))167 print("create_subnet response {}".format(response.text))168 while not response.ok:169 sleep(5)170 print("create_subnet response {}".format(response.text))171 response = requests.get(create_subnet_endpoint)172 print("###############CREATE SUBNET5###############")173 subnet_body = {174 "subnet":175 {176 "cidr": "10.0.5.0/24",177 "id": "8182a4d4-ffff-4ece-b3f0-8d36e3d88005",178 "ip_version": 4,179 "network_id": "9192a4d4-ffff-4ece-b3f0-8d36e3d88001",180 "name": "subnet2"181 }182 }183 response = requests.post(create_subnet_endpoint,184 headers=headers,185 verify=False,186 data=json.dumps(subnet_body))187 print("create_subnet response {}".format(response.text))188 while not response.ok:189 sleep(5)190 print("create_subnet response {}".format(response.text))191 response = requests.get(create_subnet_endpoint)192 print("###############CREATE ROUTER###############")193 router_body = {194 "router": {195 "admin_state_up": True,196 "availability_zone_hints": [197 "string"198 ],199 "availability_zones": [200 "string"201 ],202 "conntrack_helpers": [203 "string"204 ],205 "description": "string",206 "distributed": True,207 "external_gateway_info": {208 "enable_snat": True,209 "external_fixed_ips": [],210 "network_id": "9192a4d4-ffff-4ece-b3f0-8d36e3d88001"211 },212 "flavor_id": "string",213 "gateway_ports": [214 ],215 "ha": True,216 "id": "11112801-d675-4688-a63f-dcda8d327f50",217 "name": "router1",218 "owner": "string",219 "project_id": "123456789",220 "revision_number": 0,221 "routetable": {},222 "service_type_id": "string",223 "status": "BUILD",224 "tags": [225 "string"226 ],227 "tenant_id": "123456789"228 }229 }230 response = requests.post(create_router_endpoint,231 headers=headers,232 verify=False,233 data=json.dumps(router_body))234 print("create_router response {}".format(response.text))235 while not response.ok:236 sleep(5)237 print("create_router response {}".format(response.text))238 response = requests.get(create_router_endpoint)239 print("###############ATTACH SUBNET1 TO ROUTER###############")240 attachRouter_body = {241 "subnet_id": "8182a4d4-ffff-4ece-b3f0-8d36e3d88001"242 }243 response = requests.put(attach_router_endpoint,244 headers=headers,245 verify=False,246 data=json.dumps(attachRouter_body))247 print("attachRouter response {}".format(response.text))248 while not response.ok:249 sleep(5)250 print("attachRouter response {}".format(response.text))251 response = requests.get(attach_router_endpoint)252 print("###############ATTACH SUBNET2 TO ROUTER###############")253 attachRouter_body = {254 "subnet_id": "8182a4d4-ffff-4ece-b3f0-8d36e3d88002"255 }256 response = requests.put(attach_router_endpoint,257 headers=headers,258 verify=False,259 data=json.dumps(attachRouter_body))260 print("attachRouter response {}".format(response.text))261 while not response.ok:262 sleep(5)263 print("attachRouter response {}".format(response.text))264 response = requests.get(attach_router_endpoint)265 print("###############ATTACH SUBNET3 TO ROUTER###############")266 attachRouter_body = {267 "subnet_id": "8182a4d4-ffff-4ece-b3f0-8d36e3d88003"268 }269 response = requests.put(attach_router_endpoint,270 headers=headers,271 verify=False,272 data=json.dumps(attachRouter_body))273 print("attachRouter response {}".format(response.text))274 while not response.ok:275 sleep(5)276 print("attachRouter response {}".format(response.text))277 response = requests.get(attach_router_endpoint)278 print("###############ATTACH SUBNET4 TO ROUTER###############")279 attachRouter_body = {280 "subnet_id": "8182a4d4-ffff-4ece-b3f0-8d36e3d88004"281 }282 response = requests.put(attach_router_endpoint,283 headers=headers,284 verify=False,285 data=json.dumps(attachRouter_body))286 print("attachRouter response {}".format(response.text))287 while not response.ok:288 sleep(5)289 print("attachRouter response {}".format(response.text))290 response = requests.get(attach_router_endpoint)291 print("###############ATTACH SUBNET5 TO ROUTER###############")292 attachRouter_body = {293 "subnet_id": "8182a4d4-ffff-4ece-b3f0-8d36e3d88005"294 }295 response = requests.put(attach_router_endpoint,296 headers=headers,297 verify=False,298 data=json.dumps(attachRouter_body))299 print("attachRouter response {}".format(response.text))300 while not response.ok:301 sleep(5)302 print("attachRouter response {}".format(response.text))303 response = requests.get(attach_router_endpoint)...

Full Screen

Full Screen

s3_vpc_endpoint.py

Source:s3_vpc_endpoint.py Github

copy

Full Screen

...57 if (vpc_endpoint.vpc_id == vpc_id and58 vpc_endpoint.route_tables[0] == route_table_id):59 return vpc_endpoint60 return None61 def create_vpc_endpoint(self, vpc_id, route_table_id, service_name):62 """Creates a VPC endpoint in the VPC and ties it with a route table"""63 params = {'VpcId': vpc_id, 'RouteTableId.1': route_table_id,64 'ServiceName': service_name}65 return self.get_object('CreateVpcEndpoint', params, VPCEndpoint,66 verb='POST')67class S3VPCEndpoint(CustomActionNode):68 """Represents a VPC endpoint for Amazon S3"""69 INPUTS = {70 'Region': ['global:Region'],71 'VpcId': ['global:VpcId', 'VPC:VpcId'],72 'RouteTableId': ['global:RouteTableId', 'VPC:RouteTableId'],73 'StackType': ['global:StackType'],74 }75 DEFAULTS = {76 'Region': 'us-east-1',77 'StackType': 'Staging',78 }79 ATTRIBUTES = {'StackType': 'StackType'}80 def action(self):81 region = self.get_input('Region')82 vpc_id = self.get_input('VpcId')83 route_table_id = self.get_input('RouteTableId')84 conn = CustomVPCConnection(region=region,85 profile_name=self.aws_profile)86 vpc_endpoint = conn.get_vpc_endpoint(vpc_id, route_table_id)87 if vpc_endpoint is None:88 conn.create_vpc_endpoint(vpc_id, route_table_id,89 self.get_service_name(region))90 def get_service_name(self, region):...

Full Screen

Full Screen

create_vpc_endpoints.py

Source:create_vpc_endpoints.py Github

copy

Full Screen

...30 ToPort=443,31 FromPort=443,32 )33 return security_group34def create_vpc_endpoint(35 vpc_id: str, service_name: str, subnets: list36) -> list:37 result = []38 if is_vpc_endpoint_exists(vpc_id=vpc_id, service_name=service_name):39 print(40 f"Endpoint {service_name} already exists in VPC {vpc_id}. Skip endpoint creation."41 )42 return43 if subnets:44 security_group = create_endpoint_security_group(45 vpc_id, subnets, service_name46 )47 result.append({48 "Action": "Created security group",49 "Result": security_group50 })51 vpc_endpoint = ec2.create_vpc_endpoint(52 VpcEndpointType="Interface",53 VpcId=vpc_id,54 ServiceName=service_name,55 SubnetIds=subnets,56 SecurityGroupIds=[security_group["GroupId"]],57 )58 result.append({59 "Action": "Created VPC endpoint",60 "Result": vpc_endpoint["VpcEndpoint"]61 })62 return result63if __name__ == "__main__":64 parser = argparse.ArgumentParser(65 description="Create VPC endpoint for private subnets with NAT Gateway configured."66 )67 parser.add_argument(68 "--service-name",69 type=str,70 help="The name of AWS service to create a VPC endpoint for (for example, com.amazonaws.us-east-1.kms)",71 required=True72 )73 parser.add_argument('infile', nargs='?', type=argparse.FileType('r'), default=sys.stdin)74 args = parser.parse_args()75 input = json.load(sys.stdin)76 actions = []77 for vpc in input["VPC"]:78 if "VpcId" in vpc and "Subnets" in vpc:79 action = create_vpc_endpoint(80 vpc_id=vpc["VpcId"],81 service_name=args.service_name,82 subnets=vpc["Subnets"]83 )84 actions += action...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful