How to use _create_tenant_network method in tempest

Best Python code snippet using tempest_python

test_security_groups_basic_ops.py

Source:test_security_groups_basic_ops.py Github

copy

Full Screen

...272 floating_ip = self.create_floating_ip(273 server, public_network_id,274 client=tenant.manager.floating_ips_client)275 self.floating_ips.setdefault(server['id'], floating_ip)276 def _create_tenant_network(self, tenant, port_security_enabled=True):277 network, subnet, router = self.create_networks(278 networks_client=tenant.manager.networks_client,279 routers_client=tenant.manager.routers_client,280 subnets_client=tenant.manager.subnets_client,281 port_security_enabled=port_security_enabled)282 tenant.set_network(network, subnet, router)283 def _deploy_tenant(self, tenant_or_id):284 """creates:285 network286 subnet287 router (if public not defined)288 access security group289 access-point server290 """291 if not isinstance(tenant_or_id, self.TenantProperties):292 tenant = self.tenants[tenant_or_id]293 else:294 tenant = tenant_or_id295 self._create_tenant_keypairs(tenant)296 self._create_tenant_network(tenant)297 self._create_tenant_security_groups(tenant)298 self._set_access_point(tenant)299 def _get_server_ip(self, server, floating=False):300 """returns the ip (floating/internal) of a server"""301 if floating:302 server_ip = self.floating_ips[server['id']]['floating_ip_address']303 else:304 server_ip = None305 network_name = self.tenants[server['tenant_id']].network['name']306 if network_name in server['addresses']:307 server_ip = server['addresses'][network_name][0]['addr']308 return server_ip309 def _connect_to_access_point(self, tenant):310 """create ssh connection to tenant access point"""311 access_point_ssh = \312 self.floating_ips[tenant.access_point['id']]['floating_ip_address']313 private_key = tenant.keypair['private_key']314 access_point_ssh = self.get_remote_client(315 access_point_ssh, private_key=private_key)316 return access_point_ssh317 def _check_connectivity(self, access_point, ip, should_succeed=True):318 if should_succeed:319 msg = "Timed out waiting for %s to become reachable" % ip320 else:321 msg = "%s is reachable" % ip322 self.assertTrue(self._check_remote_connectivity(access_point, ip,323 should_succeed), msg)324 def _test_in_tenant_block(self, tenant):325 access_point_ssh = self._connect_to_access_point(tenant)326 for server in tenant.servers:327 self._check_connectivity(access_point=access_point_ssh,328 ip=self._get_server_ip(server),329 should_succeed=False)330 def _test_in_tenant_allow(self, tenant):331 ruleset = dict(332 protocol='icmp',333 remote_group_id=tenant.security_groups['default']['id'],334 direction='ingress'335 )336 self._create_security_group_rule(337 secgroup=tenant.security_groups['default'],338 security_groups_client=tenant.manager.security_groups_client,339 **ruleset340 )341 access_point_ssh = self._connect_to_access_point(tenant)342 for server in tenant.servers:343 self._check_connectivity(access_point=access_point_ssh,344 ip=self._get_server_ip(server))345 def _test_cross_tenant_block(self, source_tenant, dest_tenant):346 # if public router isn't defined, then dest_tenant access is via347 # floating-ip348 access_point_ssh = self._connect_to_access_point(source_tenant)349 ip = self._get_server_ip(dest_tenant.access_point,350 floating=self.floating_ip_access)351 self._check_connectivity(access_point=access_point_ssh, ip=ip,352 should_succeed=False)353 def _test_cross_tenant_allow(self, source_tenant, dest_tenant):354 """check for each direction:355 creating rule for tenant incoming traffic enables only 1way traffic356 """357 ruleset = dict(358 protocol='icmp',359 direction='ingress'360 )361 sec_group_rules_client = (362 dest_tenant.manager.security_group_rules_client)363 self._create_security_group_rule(364 secgroup=dest_tenant.security_groups['default'],365 sec_group_rules_client=sec_group_rules_client,366 **ruleset367 )368 access_point_ssh = self._connect_to_access_point(source_tenant)369 ip = self._get_server_ip(dest_tenant.access_point,370 floating=self.floating_ip_access)371 self._check_connectivity(access_point_ssh, ip)372 # test that reverse traffic is still blocked373 self._test_cross_tenant_block(dest_tenant, source_tenant)374 # allow reverse traffic and check375 sec_group_rules_client = (376 source_tenant.manager.security_group_rules_client)377 self._create_security_group_rule(378 secgroup=source_tenant.security_groups['default'],379 sec_group_rules_client=sec_group_rules_client,380 **ruleset381 )382 access_point_ssh_2 = self._connect_to_access_point(dest_tenant)383 ip = self._get_server_ip(source_tenant.access_point,384 floating=self.floating_ip_access)385 self._check_connectivity(access_point_ssh_2, ip)386 def _verify_mac_addr(self, tenant):387 """Verify that VM has the same ip, mac as listed in port"""388 access_point_ssh = self._connect_to_access_point(tenant)389 mac_addr = access_point_ssh.get_mac_address()390 mac_addr = mac_addr.strip().lower()391 # Get the fixed_ips and mac_address fields of all ports. Select392 # only those two columns to reduce the size of the response.393 port_list = self._list_ports(fields=['fixed_ips', 'mac_address'])394 port_detail_list = [395 (port['fixed_ips'][0]['subnet_id'],396 port['fixed_ips'][0]['ip_address'],397 port['mac_address'].lower())398 for port in port_list if port['fixed_ips']399 ]400 server_ip = self._get_server_ip(tenant.access_point)401 subnet_id = tenant.subnet['id']402 self.assertIn((subnet_id, server_ip, mac_addr), port_detail_list)403 @decorators.idempotent_id('e79f879e-debb-440c-a7e4-efeda05b6848')404 @test.services('compute', 'network')405 def test_cross_tenant_traffic(self):406 if not self.credentials_provider.is_multi_tenant():407 raise self.skipException("No secondary tenant defined")408 try:409 # deploy new project410 self._deploy_tenant(self.alt_tenant)411 self._verify_network_details(self.alt_tenant)412 self._verify_mac_addr(self.alt_tenant)413 # cross tenant check414 source_tenant = self.primary_tenant415 dest_tenant = self.alt_tenant416 self._test_cross_tenant_block(source_tenant, dest_tenant)417 self._test_cross_tenant_allow(source_tenant, dest_tenant)418 except Exception:419 for tenant in self.tenants.values():420 self._log_console_output(servers=tenant.servers)421 raise422 @decorators.idempotent_id('63163892-bbf6-4249-aa12-d5ea1f8f421b')423 @test.services('compute', 'network')424 def test_in_tenant_traffic(self):425 try:426 self._create_tenant_servers(self.primary_tenant, num=1)427 # in-tenant check428 self._test_in_tenant_block(self.primary_tenant)429 self._test_in_tenant_allow(self.primary_tenant)430 except Exception:431 for tenant in self.tenants.values():432 self._log_console_output(servers=tenant.servers)433 raise434 @decorators.idempotent_id('f4d556d7-1526-42ad-bafb-6bebf48568f6')435 @test.services('compute', 'network')436 def test_port_update_new_security_group(self):437 """Verifies the traffic after updating the vm port438 With new security group having appropriate rule.439 """440 new_tenant = self.primary_tenant441 # Create empty security group and add icmp rule in it442 new_sg = self._create_empty_security_group(443 namestart='secgroup_new-',444 tenant_id=new_tenant.creds.tenant_id,445 client=new_tenant.manager.security_groups_client)446 icmp_rule = dict(447 protocol='icmp',448 direction='ingress',449 )450 sec_group_rules_client = new_tenant.manager.security_group_rules_client451 self._create_security_group_rule(452 secgroup=new_sg,453 sec_group_rules_client=sec_group_rules_client,454 **icmp_rule)455 new_tenant.security_groups.update(new_sg=new_sg)456 # Create server with default security group457 name = 'server-{tenant}-gen-1'.format(458 tenant=new_tenant.creds.tenant_name459 )460 name = data_utils.rand_name(name)461 server = self._create_server(name, new_tenant,462 [new_tenant.security_groups['default']])463 # Check connectivity failure with default security group464 try:465 access_point_ssh = self._connect_to_access_point(new_tenant)466 self._check_connectivity(access_point=access_point_ssh,467 ip=self._get_server_ip(server),468 should_succeed=False)469 server_id = server['id']470 port_id = self._list_ports(device_id=server_id)[0]['id']471 # update port with new security group and check connectivity472 self.ports_client.update_port(port_id, security_groups=[473 new_tenant.security_groups['new_sg']['id']])474 self._check_connectivity(475 access_point=access_point_ssh,476 ip=self._get_server_ip(server))477 except Exception:478 for tenant in self.tenants.values():479 self._log_console_output(servers=tenant.servers)480 raise481 @decorators.idempotent_id('d2f77418-fcc4-439d-b935-72eca704e293')482 @test.services('compute', 'network')483 def test_multiple_security_groups(self):484 """Verify multiple security groups and checks that rules485 provided in the both the groups is applied onto VM486 """487 tenant = self.primary_tenant488 ip = self._get_server_ip(tenant.access_point,489 floating=self.floating_ip_access)490 ssh_login = CONF.validation.image_ssh_user491 private_key = tenant.keypair['private_key']492 self.check_vm_connectivity(ip,493 should_connect=False)494 ruleset = dict(495 protocol='icmp',496 direction='ingress'497 )498 self._create_security_group_rule(499 secgroup=tenant.security_groups['default'],500 **ruleset501 )502 # NOTE: Vm now has 2 security groups one with ssh rule(503 # already added in setUp() method),and other with icmp rule504 # (added in the above step).The check_vm_connectivity tests505 # -that vm ping test is successful506 # -ssh to vm is successful507 self.check_vm_connectivity(ip,508 username=ssh_login,509 private_key=private_key,510 should_connect=True)511 @test.requires_ext(service='network', extension='port-security')512 @decorators.idempotent_id('7c811dcc-263b-49a3-92d2-1b4d8405f50c')513 @test.services('compute', 'network')514 def test_port_security_disable_security_group(self):515 """Verify the default security group rules is disabled."""516 new_tenant = self.primary_tenant517 # Create server518 name = 'server-{tenant}-gen-1'.format(519 tenant=new_tenant.creds.tenant_name520 )521 name = data_utils.rand_name(name)522 server = self._create_server(name, new_tenant,523 [new_tenant.security_groups['default']])524 access_point_ssh = self._connect_to_access_point(new_tenant)525 server_id = server['id']526 port_id = self._list_ports(device_id=server_id)[0]['id']527 # Flip the port's port security and check connectivity528 try:529 self.ports_client.update_port(port_id,530 port_security_enabled=True,531 security_groups=[])532 self._check_connectivity(access_point=access_point_ssh,533 ip=self._get_server_ip(server),534 should_succeed=False)535 self.ports_client.update_port(port_id,536 port_security_enabled=False,537 security_groups=[])538 self._check_connectivity(539 access_point=access_point_ssh,540 ip=self._get_server_ip(server))541 except Exception:542 for tenant in self.tenants.values():543 self._log_console_output(servers=tenant.servers)544 raise545 @test.requires_ext(service='network', extension='port-security')546 @decorators.idempotent_id('13ccf253-e5ad-424b-9c4a-97b88a026699')547 @testtools.skipUnless(548 CONF.compute_feature_enabled.allow_port_security_disabled,549 'Port security must be enabled.')550 # TODO(mriedem): We shouldn't actually need to check this since neutron551 # disables the port_security extension by default, but the problem is nova552 # assumes port_security_enabled=True if it's not set on the network553 # resource, which will mean nova may attempt to apply a security group on554 # a port on that network which would fail. This is really a bug in nova.555 @testtools.skipUnless(556 CONF.network_feature_enabled.port_security,557 'Port security must be enabled.')558 @test.services('compute', 'network')559 def test_boot_into_disabled_port_security_network_without_secgroup(self):560 tenant = self.primary_tenant561 self._create_tenant_network(tenant, port_security_enabled=False)562 self.assertFalse(tenant.network['port_security_enabled'])563 name = data_utils.rand_name('server-smoke')564 sec_groups = []565 server = self._create_server(name, tenant, sec_groups)566 server_id = server['id']567 ports = self._list_ports(device_id=server_id)568 self.assertEqual(1, len(ports))569 for port in ports:570 self.assertEmpty(port['security_groups'],571 "Neutron shouldn't even use it's default sec "...

Full Screen

Full Screen

manageNeutron.py

Source:manageNeutron.py Github

copy

Full Screen

...59 }60 neutronClient.create_subnet(external_subnet)61 print "Created %s subnet" % PHYSICAL_SUBNET_NAME62 #Creating the default tenant network and subnet.63 tenant_subnet_id = _create_tenant_network()64 #Creating the router65 _create_router(tenant_subnet_id, external_net_id)66 else:67 print "This function has not been implented yet, try running with the -a option or -h option for help"68def delete(args):69 global neutronClient70 # Getting IDs of all the resources and mapping them out, since71 # the neutron api only accepts ids and not resource names.72 networks = neutronClient.list_networks()73 subnets = neutronClient.list_subnets()74 routers = neutronClient.list_routers()75 network_map = {}76 subnet_map = {}77 router_map = {}78 for network in networks['networks']:79 network_map[network['name']] = network['id']80 for subnet in subnets['subnets']:81 subnet_map[subnet['name']] = subnet['id']82 for router in routers['routers']:83 router_map[router['name']] = router['id']84 print "Make sure all instances or associated neutron ports are deleted before running this operation"85 # If -a options is set, remove the default networking components86 if args.all:87 _delete_router(router_map, subnet_map)88 # Deleting default networks (eventually check if they exists)89 _delete_network(network_map, PHYSICAL_NETWORK_NAME)90 _delete_network(network_map, TENANT_NETWORK_NAME)91def debug(args):92 print "This program will eventually help with debugging environments and such..."93 global neutronClient94 if args.restart_services:95 _restart_neutron_services(args.inventory)96# List all the networks. Mainly for development testing purposes.97def list(args):98 global neutronClient99 networks = neutronClient.list_networks()100 pprint.pprint(networks)101 for nets in networks['networks']:102 print nets['name']103#Private helper functions104# This function creates a vxlan tenant network with it's corresponding subnet105def _create_tenant_network(subnet_cidr="10.10.10.0/24", net_name=TENANT_NETWORK_NAME, subnet_name=TENANT_SUBNET_NAME):106 global neutronClient107 # The JSON object representation of the tenant network108 tenant_network = {109 "network": {110 "name": net_name,111 "provider:network_type": "vxlan",112 }113 }114 response = neutronClient.create_network(tenant_network)115 print "Created %s network" % net_name116 #Retriving the UUID of the tenant_network117 tenant_network_id = response['network']['id']118 # The JSON object representation of the tenant subnet119 tenant_subnet = {...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful