How to use delete_vpc_endpoints method in localstack

Best Python code snippet using localstack_python

ec2_starter.py

Source:ec2_starter.py Github

copy

Full Screen

...223 if not hasattr(vpcs.VPCs, "delete_vpc_endpoints"):224 vpcs.VPCs.create_vpc_endpoint_service_configuration = (225 create_vpc_endpoint_service_configuration226 )227 def delete_vpc_endpoints(self):228 vpc_endpoints_ids = self._get_multi_param("VpcEndpointId")229 for ep_id in vpc_endpoints_ids:230 self.ec2_backend.vpc_end_points.pop(ep_id, None)231 result = {"DeleteVpcEndpointsResponse": {"@xmlns": XMLNS_EC2, "unsuccessful": []}}232 return xmltodict.unparse(result)233 if not hasattr(vpcs.VPCs, "delete_vpc_endpoints"):234 vpcs.VPCs.delete_vpc_endpoints = delete_vpc_endpoints235def start_ec2(port=None, asynchronous=False, update_listener=None):236 patch_ec2()237 port = port or config.PORT_EC2238 return start_moto_server(239 "ec2",240 port,241 name="EC2",...

Full Screen

Full Screen

__init__.py

Source:__init__.py Github

copy

Full Screen

...104 ]105 except Exception as error:106 print(error)107 exit(1)108def delete_vpc_endpoints(vpc_endpoint_ids: list) -> bool:109 try:110 client = boto3.client('ec2')111 client.delete_vpc_endpoints(VpcEndpointIds=vpc_endpoint_ids)112 return True113 except Exception as error:114 print(error)115 exit(1)116def list_vpc_peering_connection_ids(vpc_ids: list) -> list:117 try:118 if not vpc_ids: return []119 client = boto3.client('ec2')120 status_filter = {121 'Name': 'status-code',122 'Values': ['pending-acceptance','expired','provisioning','active','rejected']123 }124 accepter_filter = {125 'Name': 'accepter-vpc-info.vpc-id',126 'Values': vpc_ids127 }128 requester_filter = {129 'Name': 'requester-vpc-info.vpc-id',130 'Values': vpc_ids131 }132 response = client.describe_vpc_peering_connections(133 Filters=[ status_filter, accepter_filter ],134 )135 vpc_peering_connection_ids = [136 vpc_peering_connection['VpcPeeringConnectionId']137 for vpc_peering_connection in response['VpcPeeringConnections']138 ]139 response = client.describe_vpc_peering_connections(140 Filters=[ status_filter, requester_filter ],141 )142 vpc_peering_connection_ids += [143 vpc_peering_connection['VpcPeeringConnectionId']144 for vpc_peering_connection in response['VpcPeeringConnections']145 ]146 return vpc_peering_connection_ids147 except Exception as error:148 print(error)149 exit(1)150def delete_vpc_peering_connections(vpc_peering_connection_ids: list) -> bool:151 try:152 client = boto3.client('ec2')153 for vpc_peering_connection_id in vpc_peering_connection_ids:154 client.delete_vpc_peering_connection(155 VpcPeeringConnectionId=vpc_peering_connection_id)156 waiter = client.get_waiter('vpc_peering_connection_deleted')157 waiter.wait(VpcPeeringConnectionIds=vpc_peering_connection_ids)158 return True159 except Exception as error:160 print(error)161 exit(1)162def list_vpc_subnet_ids(vpc_ids: list) -> bool:163 try:164 if not vpc_ids: return []165 client = boto3.client('ec2')166 response = client.describe_subnets(167 Filters=[168 {169 'Name': 'vpc-id',170 'Values': vpc_ids171 },172 ]173 )174 return [175 subnet['SubnetId']176 for subnet in response['Subnets']177 ]178 except Exception as error:179 print(error)180 exit(1)181def delete_vpc_subnets(subnet_ids: list) -> bool:182 try:183 client = boto3.client('ec2')184 for subnet_id in subnet_ids:185 client.delete_subnet(SubnetId=subnet_id)186 return True187 except Exception as error:188 print(error)189 exit(1)190def delete_internet_gateways(vpc_ids: list) -> bool:191 try:192 if not vpc_ids: return []193 client = boto3.client('ec2')194 response = client.describe_internet_gateways(195 Filters=[196 {197 'Name': 'attachment.vpc-id',198 'Values': vpc_ids199 },200 ],201 )202 for internet_gateway in response['InternetGateways']:203 for attachment in internet_gateway['Attachments']:204 client.detach_internet_gateway(205 InternetGatewayId=internet_gateway['InternetGatewayId'],206 VpcId=attachment['VpcId'])207 client.delete_internet_gateway(208 InternetGatewayId=internet_gateway['InternetGatewayId'])209 return True210 except Exception as error:211 print(error)212 exit(1)213def delete_route_tables(vpc_ids: list) -> bool:214 try:215 if not vpc_ids: return []216 client = boto3.client('ec2')217 response = client.describe_route_tables(218 Filters=[219 {220 'Name': 'vpc-id',221 'Values': vpc_ids222 },223 ],224 )225 for route_table in response['RouteTables']:226 if route_table['Associations'] and route_table['Associations'][0]['Main']: continue227 for route in route_table['Routes']:228 if 'GatewayId' in route and route['GatewayId'] == 'local': continue229 client.delete_route(230 DestinationCidrBlock=route['DestinationCidrBlock'],231 RouteTableId=route_table['RouteTableId'])232 client.delete_route_table(RouteTableId=route_table['RouteTableId'])233 return True234 except Exception as error:235 print(error)236 exit(1)237class VpcResources:238 @classmethod239 async def init(cls):240 self = cls()241 self.vpc_ids = list_vpc_ids()242 self.nat_gateway_ids = list_nat_gateway_ids(self.vpc_ids) if self.vpc_ids else []243 self.vpc_endpoint_ids = list_vpc_endpoint_ids(self.vpc_ids) if self.vpc_ids else []244 self.vpc_peering_connection_ids = list_vpc_peering_connection_ids(self.vpc_ids) if self.vpc_ids else []245 self.vpc_subnet_ids = list_vpc_subnet_ids(self.vpc_ids) if self.vpc_ids else []246 return self247 248 def print(self):249 # output will deleted resources list250 print("==== VPC ====")251 pprint(self.vpc_ids)252 print("==== NAT Gateways ====")253 pprint(self.nat_gateway_ids)254 print("==== VPC Endpoints ====")255 pprint(self.vpc_endpoint_ids)256 print("==== VPC Peering Connections ====")257 pprint(self.vpc_peering_connection_ids)258 print("==== VPC Subnets ====")259 pprint(self.vpc_subnet_ids)260 def delete(self):261 # delete nat gateways262 if self.nat_gateway_ids:263 delete_nat_gateways(self.nat_gateway_ids)264 self.nat_gateway_ids = []265 # delete vpc endpoint266 if self.vpc_endpoint_ids:267 delete_vpc_endpoints(self.vpc_endpoint_ids)268 self.vpc_endpoint_ids = []269 # delete vpc peering270 if self.vpc_peering_connection_ids:271 delete_vpc_peering_connections(self.vpc_peering_connection_ids)272 self.vpc_peering_connection_ids = []273 # delete subnets274 if self.vpc_subnet_ids:275 delete_vpc_subnets(self.vpc_subnet_ids)276 self.vpc_subnet_ids = []277 # delete igw -> route-table -> vpc278 if self.vpc_ids:279 delete_internet_gateways(self.vpc_ids)280 delete_route_tables(self.vpc_ids)281 delete_vpcs(self.vpc_ids)

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful