How to use describe_client_vpn_connections method in localstack

Best Python code snippet using localstack_python

main.py

Source:main.py Github

copy

Full Screen

1import boto32import logging3from os import environ45logger = logging.getLogger()6logger.setLevel(logging.INFO)78ec2 = boto3.client('ec2')9route53 = boto3.client('route53')1011client_vpn_endpoint_id = environ.get('CLIENT_VPN_ENDPOINT_ID')12if client_vpn_endpoint_id == None:13 logger.error('Environment variable "CLIENT_VPN_ENDPOINT_ID" must be set')14 raise Exception(15 'Environment variable "CLIENT_VPN_ENDPOINT_ID" must be set')1617hosted_zone_id = environ.get('HOSTED_ZONE_ID')18if hosted_zone_id == None:19 logger.error('Environment variable "HOSTED_ZONE_ID" must be set')20 raise Exception(21 'Environment variable "HOSTED_ZONE_ID" must be set')2223hosted_zone_name = environ.get('HOSTED_ZONE_NAME')24if hosted_zone_name == None:25 logger.error('Environment variable "HOSTED_ZONE_NAME" must be set')26 raise Exception(27 'Environment variable "HOSTED_ZONE_NAME" must be set')282930def lambda_handler(event, context):31 paginator = ec2.get_paginator('describe_client_vpn_connections')32 page_iterator = paginator.paginate(33 ClientVpnEndpointId=client_vpn_endpoint_id34 )35 connections = []36 for page in page_iterator:37 connections += page['Connections']3839 clients = {}40 for connection in connections:41 # Ignore connections that aren't in active status42 if connection['Status']['Code'] != 'active':43 continue44 common_name = connection['CommonName']45 dns_name = "{}.{}.".format(common_name, hosted_zone_name).lower()46 # If a connection with this Common Name already exists, overwrite it if this one started earlier47 if dns_name in clients:48 if connection['ConnectionEstablishedTime'] < clients[dns_name]['ConnectionEstablishedTime']:49 clients[dns_name] = connection50 else:51 clients[dns_name] = connection5253 logger.debug(clients)5455 paginator = route53.get_paginator('list_resource_record_sets')56 page_iterator = paginator.paginate(57 HostedZoneId=hosted_zone_id58 )59 record_sets = []60 for page in page_iterator:61 record_sets += page['ResourceRecordSets']6263 logger.debug(record_sets)6465 # Find all existing record sets that don't have a corresponding client66 sets_to_delete = []67 for record_set in record_sets:68 if record_set['Name'] not in clients and record_set['Type'] == 'A':69 sets_to_delete.append({70 'Action': 'DELETE',71 'ResourceRecordSet': record_set72 })7374 existing_clients = {record_set['Name']: record_set for record_set in record_sets}7576 sets_to_upsert = []77 # Loop through all connected clients78 for dns_name in clients:79 # Every client gets an upserted record if there isn't already a record or the existing record doesn't match the IP80 if dns_name not in existing_clients or existing_clients[dns_name]['ResourceRecords'][0]['Value'] != clients[dns_name]['ClientIp']:81 sets_to_upsert.append({82 'Action': 'UPSERT',83 'ResourceRecordSet': {84 'Name': dns_name,85 'Type': 'A',86 'TTL': 0,87 'ResourceRecords': [88 {89 'Value': clients[dns_name]['ClientIp']90 },91 ]92 }93 })9495 sets_to_change = sets_to_delete + sets_to_upsert9697 if len(sets_to_change) > 0:98 logger.info("Changing sets: {}".format(sets_to_change))99 response = route53.change_resource_record_sets(100 HostedZoneId=hosted_zone_id,101 ChangeBatch={102 'Changes': sets_to_change103 }104 ) ...

Full Screen

Full Screen

lambda_function.py

Source:lambda_function.py Github

copy

Full Screen

...33 except URLError as e:34 logger.error("Server connection failed: %s", e.reason)35 return 036rout353client = boto3.client('route53')37response = client.describe_client_vpn_connections(38 ClientVpnEndpointId=Clientvpnendpoint39)40print(response)41def lambda_handler(event, context):42 length = len(response["Connections"])43 print("length", length)44 client_IpAddress = []45 commonName = []46 for i in range(length):47 if (response["Connections"][i]["Status"]["Code"]) == "active":48 client_IpAddress = response["Connections"][i]["ClientIp"]49 commonName = response["Connections"][i]["CommonName"]50 #print(client_IpAddress, commonName)51 subdomain = Sub_domain_name...

Full Screen

Full Screen

app.py

Source:app.py Github

copy

Full Screen

...11 try:12 if event['AssociateSubnets'] == 'false':13 logger.info(f"terminating current vpn sessions to {event['ClientVpnEndpointId']}")14 ec2 = boto3.client('ec2')15 resp = ec2.describe_client_vpn_connections(ClientVpnEndpointId=event['ClientVpnEndpointId'])16 for conn in resp['Connections']:17 if conn['Status']['Code'] == 'active':18 ec2.terminate_client_vpn_connections(19 ClientVpnEndpointId=event['ClientVpnEndpointId'],20 ConnectionId=conn['ConnectionId']21 )22 logger.info(f"terminated session {conn['ConnectionId']}")23 client = boto3.client('cloudformation')24 logger.info(client.update_stack(25 StackName=event['StackName'],26 UsePreviousTemplate=True,27 Capabilities=['CAPABILITY_IAM'],28 Parameters=[29 {...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful