How to use get_network_client method in lisa

Best Python code snippet using lisa_python

provider.py

Source:provider.py Github

copy

Full Screen

...84 cyclades_url = astakos.get_endpoint_url(CycladesComputeClient.service_type)85 compute_client = CycladesComputeClient(cyclades_url, self.auth_token)86 return compute_client87 88 def get_network_client(self):89 astakos = AstakosClient(self.endpoints['astakos'], self.auth_token)90 network_url = astakos.get_endpoint_url(CycladesNetworkClient.service_type)91 network_client = CycladesNetworkClient(network_url, self.auth_token)92 return network_client93 94 def get_identity_client(self):95 return AstakosClient(self.endpoints['astakos'], self.auth_token)96 def add_private_network(self, recreate=True):97 existing_net = self.get_private_network()98 if not recreate and existing_net:99 return existing_net100 101 # if get_private_network():102 # clean network103 project = self.get_project_id()104 network = self.get_network_client()105 net = network.create_network(type='MAC_FILTERED', name='Juju-okeanos private network', project_id=project)106 network.create_subnet(net['id'], '192.168.1.0/24')107 #network.create_subnet(net['id'], '192.168.1.0/24' , gateway_ip='192.168.1.1', 108 # allocation_pools={"start": "192.168.1.2", "end": "192.168.1.254"}, enable_dhcp=True)109 sleep(10)110 return net111 def get_private_network(self):112 network = self.get_network_client()113 for net in network.list_networks(detail=True):114 if not net['public'] and net['name'] == 'Juju-okeanos private network':115 return net116 return None117 def attach_private_ip_to_machine(self, net, vm):118 project = self.get_project_id()119 network = self.get_network_client()120 port = network.create_port(net['id'], vm['id'])121 print("****** Private port for vm with id {} *******".format(vm['id']))122 print(port)123 print("****** port *******")124 port['status'] = network.wait_port(port['id'], port['status'])125 sleep(10)126 return port127 def attach_public_ip_to_machine(self, vm):128 project = self.get_project_id()129 network = self.get_network_client()130 ip = network.create_floatingip(project_id=project)131 print('Reserved new IP {}'.format(ip['floating_ip_address']))132 port = network.create_port(133 network_id=ip['floating_network_id'],134 device_id=vm['id'],135 fixed_ips=[dict(ip_address=ip['floating_ip_address']), ])136 print("****** Public port for vm with id {} *******".format(vm['id']))137 print(port)138 print("****** port *******")139 port['status'] = network.wait_port(port['id'], port['status'])140 sleep(10)141 return port142 def set_internal_gw(self, vm):143 self.remote_run(vm, ["route del default"])...

Full Screen

Full Screen

azure_networking_base.py

Source:azure_networking_base.py Github

copy

Full Screen

...11 cred_object = static_handler_utils.StaticHandlers.read_secret()12 credential = ClientHandlers.login_to_azure(**cred_object)13 network_list = []14 final_list = []15 for element in ClientHandlers.get_network_client(credential,16 self.subscription_id).virtual_networks.list_all():17 network_list.append(element)18 if len(network_list) > 0:19 for network in network_list:20 final_list.append((network.__dict__['id'], network.__dict__['name'],21 network.__dict__['address_space'].__dict__['address_prefixes'][0]))22 return final_list23 else:24 return ClientHandlers.static_reply()25 except CloudError as e:26 logging.error((str(e)))27 def getting_vnet_id(self,vnet_name):28 try:29 vnet_list = self.list_all_vnet()30 for i in vnet_list:31 if i[1] == vnet_name:32 return i[0]33 except CloudError as e :34 logging.error(str(e))35 def get_vnet_resource_group(self, vnet_name):36 try:37 for i in self.list_all_vnet(): # provides Id of each virtual network38 if (i[0].split('/')[8]) == vnet_name:39 return i[0].split('/')[4] # getting resource group name40 except CloudError as e:41 logging.error(e)42 def get_peering_networks(self,vnet_name):43 try:44 peering_list = []45 cred_object = static_handler_utils.StaticHandlers.read_secret()46 credential = ClientHandlers.login_to_azure(**cred_object)47 rg_name = self.get_vnet_resource_group(vnet_name) # getting resource group for virtual network48 if rg_name != None:49 peering_object = ClientHandlers.get_network_client(credential,50 self.subscription_id).virtual_network_peerings.list(51 resource_group_name=rg_name, virtual_network_name=vnet_name)52 for i in peering_object:53 peering_list.append(i.__dict__['id'].split('/')[10].split('-to-')[1])54 if len(peering_list) > 0:55 return peering_list56 else:57 return ClientHandlers.static_reply()58 except CloudError as e:59 logging.error(e)60 def get_vnet_details(self,vnet_name):61 try:62 cred_object = static_handler_utils.StaticHandlers.read_secret()63 credential = ClientHandlers.login_to_azure(**cred_object)64 rg_name = self.get_vnet_resource_group(vnet_name)65 if rg_name != None :66 subnet_det = []67 virtual_network = ClientHandlers.get_network_client(credential,68 self.subscription_id).virtual_networks.get(69 resource_group_name=rg_name, virtual_network_name=vnet_name).__dict__70 subnet_details = virtual_network['subnets']71 for subnet in subnet_details:72 subnet_det.append((subnet.__dict__['address_prefix'], subnet.__dict__['id'].split('/')[10]))73 return [virtual_network['address_space'].__dict__['address_prefixes'], subnet_det]74 else :75 return ClientHandlers.static_reply()76 except CloudError as e:77 logging.error(e)78 def get_network_interface(self,nic_id):79 try:80 network_interface_details = {}81 nic_name = nic_id.split('/')[-1]82 rg_name = nic_id.split('/')[4]83 cred_object = static_handler_utils.StaticHandlers.read_secret()84 credential = ClientHandlers.login_to_azure(**cred_object)85 network_interface = ClientHandlers.get_network_client(credential,self.subscription_id).network_interfaces.\86 get(resource_group_name=rg_name,network_interface_name=nic_name)87 for nic in network_interface.ip_configurations:88 network_interface_details['subnet_id'] = nic.subnet.id89 network_interface_details["private_ip"] = nic.private_ip_address90 if nic.public_ip_address is None:91 network_interface_details["public_ip"] = nic.public_ip_address92 else:93 public_ip = nic.public_ip_address.__dict__94 id = public_ip['id']95 public_ip_name = id.split('/')[-1]96 public_rg_name = id.split('/')[4]97 public_ip_details = ClientHandlers.get_network_client(credential, self.subscription_id).\98 public_ip_addresses.\99 get(resource_group_name= public_rg_name ,public_ip_address_name= public_ip_name)100 network_interface_details["public_ip"] = public_ip_details.__dict__['ip_address']101 return network_interface_details102 except CloudError as e:103 logging.error(e)104 def get_subnet_details(self,subnet_id):105 try:106 subnet_details =[]107 vnet_rg_name = subnet_id.split('/')[4]108 vnet_name = subnet_id.split('/')[8]109 subnet_name = subnet_id.split('/')[10]110 cred_object = static_handler_utils.StaticHandlers.read_secret()111 credential = ClientHandlers.login_to_azure(**cred_object)112 subnet = ClientHandlers.get_network_client(credential,self.subscription_id).subnets.get(resource_group_name=vnet_rg_name,virtual_network_name= vnet_name, subnet_name=subnet_name)113 logging.info(subnet)114 subnet_details.append(subnet.name)115 subnet_details.append(subnet.address_prefix)116 if subnet.__dict__['network_security_group'] is None:117 logging.warning("! No NSG in subnet ")118 subnet_details.append(subnet.__dict__['network_security_group'])119 else:120 nsg_id = subnet.__dict__['network_security_group'].id121 subnet_details.append(nsg_id.split('/')[-1])122 return subnet_details123 except CloudError as e:...

Full Screen

Full Screen

ip.py

Source:ip.py Github

copy

Full Screen

...22 instance = get_instance_id(scaleset, machine_id)23 if not isinstance(instance, str):24 return None25 resource_group = get_base_resource_group()26 client = get_network_client()27 intf = client.network_interfaces.list_virtual_machine_scale_set_network_interfaces(28 resource_group, str(scaleset)29 )30 try:31 for interface in intf:32 resource = parse_resource_id(interface.virtual_machine.id)33 if resource.get("resource_name") != instance:34 continue35 for config in interface.ip_configurations:36 if config.private_ip_address is None:37 continue38 return str(config.private_ip_address)39 except (ResourceNotFoundError, CloudError):40 # this can fail if an interface is removed during the iteration41 pass42 return None43def get_ip(resource_group: str, name: str) -> Optional[Any]:44 logging.info("getting ip %s:%s", resource_group, name)45 network_client = get_network_client()46 try:47 return network_client.public_ip_addresses.get(resource_group, name)48 except (ResourceNotFoundError, CloudError):49 return None50def delete_ip(resource_group: str, name: str) -> Any:51 logging.info("deleting ip %s:%s", resource_group, name)52 network_client = get_network_client()53 return network_client.public_ip_addresses.begin_delete(resource_group, name)54def create_ip(resource_group: str, name: str, region: Region) -> Any:55 logging.info("creating ip for %s:%s in %s", resource_group, name, region)56 network_client = get_network_client()57 params: Dict[str, Union[str, Dict[str, str]]] = {58 "location": region,59 "public_ip_allocation_method": "Dynamic",60 }61 if "ONEFUZZ_OWNER" in os.environ:62 params["tags"] = {"OWNER": os.environ["ONEFUZZ_OWNER"]}63 return network_client.public_ip_addresses.begin_create_or_update(64 resource_group, name, params65 )66def get_public_nic(resource_group: str, name: str) -> Optional[Any]:67 logging.info("getting nic: %s %s", resource_group, name)68 network_client = get_network_client()69 try:70 return network_client.network_interfaces.get(resource_group, name)71 except (ResourceNotFoundError, CloudError):72 return None73def delete_nic(resource_group: str, name: str) -> Optional[Any]:74 logging.info("deleting nic %s:%s", resource_group, name)75 network_client = get_network_client()76 return network_client.network_interfaces.begin_delete(resource_group, name)77def create_public_nic(78 resource_group: str, name: str, region: Region, nsg: Optional[NSG]79) -> Optional[Error]:80 logging.info("creating nic for %s:%s in %s", resource_group, name, region)81 network = Network(region)82 subnet_id = network.get_id()83 if subnet_id is None:84 network.create()85 return None86 if nsg:87 subnet = network.get_subnet()88 if isinstance(subnet, Subnet) and not subnet.network_security_group:89 result = nsg.associate_subnet(network.get_vnet(), subnet)90 if isinstance(result, Error):91 return result92 return None93 ip = get_ip(resource_group, name)94 if not ip:95 create_ip(resource_group, name, region)96 return None97 params = {98 "location": region,99 "ip_configurations": [100 {101 "name": "myIPConfig",102 "public_ip_address": ip,103 "subnet": {"id": subnet_id},104 }105 ],106 }107 if "ONEFUZZ_OWNER" in os.environ:108 params["tags"] = {"OWNER": os.environ["ONEFUZZ_OWNER"]}109 network_client = get_network_client()110 try:111 network_client.network_interfaces.begin_create_or_update(112 resource_group, name, params113 )114 except (ResourceNotFoundError, CloudError) as err:115 if "RetryableError" not in repr(err):116 return Error(117 code=ErrorCode.VM_CREATE_FAILED,118 errors=["unable to create nic: %s" % err],119 )120 return None121def get_public_ip(resource_id: str) -> Optional[str]:122 logging.info("getting ip for %s", resource_id)123 network_client = get_network_client()124 resource = parse_resource_id(resource_id)125 ip = (126 network_client.network_interfaces.get(127 resource["resource_group"], resource["name"]128 )129 .ip_configurations[0]130 .public_ip_address131 )132 resource = parse_resource_id(ip.id)133 ip = network_client.public_ip_addresses.get(134 resource["resource_group"], resource["name"]135 ).ip_address136 if ip is None:137 return None...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful