How to use _deploy_tenant method in tempest

Best Python code snippet using tempest_python

test_security_groups_basic_ops.py

Source:test_security_groups_basic_ops.py Github

copy

Full Screen

...139 def cleanup_wrapper(self, resource):140 self.cleanup_resource(resource, self.__class__.__name__)141 def setUp(self):142 super(TestSecurityGroupsBasicOps, self).setUp()143 self._deploy_tenant(self.primary_tenant)144 self._verify_network_details(self.primary_tenant)145 self._verify_mac_addr(self.primary_tenant)146 def _create_tenant_keypairs(self, tenant):147 keypair = self.create_keypair(tenant.manager.keypairs_client)148 tenant.keypair = keypair149 def _create_tenant_security_groups(self, tenant):150 access_sg = self._create_empty_security_group(151 namestart='secgroup_access-',152 tenant_id=tenant.creds.tenant_id,153 client=tenant.manager.network_client154 )155 # don't use default secgroup since it allows in-tenant traffic156 def_sg = self._create_empty_security_group(157 namestart='secgroup_general-',158 tenant_id=tenant.creds.tenant_id,159 client=tenant.manager.network_client160 )161 tenant.security_groups.update(access=access_sg, default=def_sg)162 ssh_rule = dict(163 protocol='tcp',164 port_range_min=22,165 port_range_max=22,166 direction='ingress',167 )168 self._create_security_group_rule(secgroup=access_sg,169 client=tenant.manager.network_client,170 **ssh_rule)171 def _verify_network_details(self, tenant):172 # Checks that we see the newly created network/subnet/router via173 # checking the result of list_[networks,routers,subnets]174 # Check that (router, subnet) couple exist in port_list175 seen_nets = self._list_networks()176 seen_names = [n['name'] for n in seen_nets]177 seen_ids = [n['id'] for n in seen_nets]178 self.assertIn(tenant.network.name, seen_names)179 self.assertIn(tenant.network.id, seen_ids)180 seen_subnets = [(n['id'], n['cidr'], n['network_id'])181 for n in self._list_subnets()]182 mysubnet = (tenant.subnet.id, tenant.subnet.cidr, tenant.network.id)183 self.assertIn(mysubnet, seen_subnets)184 seen_routers = self._list_routers()185 seen_router_ids = [n['id'] for n in seen_routers]186 seen_router_names = [n['name'] for n in seen_routers]187 self.assertIn(tenant.router.name, seen_router_names)188 self.assertIn(tenant.router.id, seen_router_ids)189 myport = (tenant.router.id, tenant.subnet.id)190 router_ports = [(i['device_id'], i['fixed_ips'][0]['subnet_id']) for i191 in self._list_ports()192 if self._is_router_port(i)]193 self.assertIn(myport, router_ports)194 def _is_router_port(self, port):195 """Return True if port is a router interface."""196 # NOTE(armando-migliaccio): match device owner for both centralized197 # and distributed routers; 'device_owner' is "" by default.198 return port['device_owner'].startswith('network:router_interface')199 def _create_server(self, name, tenant, security_groups=None):200 """201 creates a server and assigns to security group202 """203 self._set_compute_context(tenant)204 if security_groups is None:205 security_groups = [tenant.security_groups['default']]206 create_kwargs = {207 'networks': [208 {'uuid': tenant.network.id},209 ],210 'key_name': tenant.keypair['name'],211 'security_groups': security_groups,212 'tenant_id': tenant.creds.tenant_id213 }214 server = self.create_server(name=name, create_kwargs=create_kwargs)215 self.assertEqual(216 sorted([s['name'] for s in security_groups]),217 sorted([s['name'] for s in server['security_groups']]))218 return server219 def _create_tenant_servers(self, tenant, num=1):220 for i in range(num):221 name = 'server-{tenant}-gen-{num}-'.format(222 tenant=tenant.creds.tenant_name,223 num=i224 )225 name = data_utils.rand_name(name)226 server = self._create_server(name, tenant)227 tenant.servers.append(server)228 def _set_access_point(self, tenant):229 """230 creates a server in a secgroup with rule allowing external ssh231 in order to access tenant internal network232 workaround ip namespace233 """234 secgroups = tenant.security_groups.values()235 name = 'server-{tenant}-access_point-'.format(236 tenant=tenant.creds.tenant_name)237 name = data_utils.rand_name(name)238 server = self._create_server(name, tenant,239 security_groups=secgroups)240 tenant.access_point = server241 self._assign_floating_ips(tenant, server)242 def _assign_floating_ips(self, tenant, server):243 public_network_id = CONF.network.public_network_id244 floating_ip = self._create_floating_ip(245 server, public_network_id,246 client=tenant.manager.network_client)247 self.floating_ips.setdefault(server['id'], floating_ip)248 def _create_tenant_network(self, tenant):249 network, subnet, router = self.create_networks(250 client=tenant.manager.network_client)251 tenant.set_network(network, subnet, router)252 def _set_compute_context(self, tenant):253 self.servers_client = tenant.manager.servers_client254 return self.servers_client255 def _deploy_tenant(self, tenant_or_id):256 """257 creates:258 network259 subnet260 router (if public not defined)261 access security group262 access-point server263 """264 if not isinstance(tenant_or_id, self.TenantProperties):265 tenant = self.tenants[tenant_or_id]266 else:267 tenant = tenant_or_id268 self._set_compute_context(tenant)269 self._create_tenant_keypairs(tenant)270 self._create_tenant_network(tenant)271 self._create_tenant_security_groups(tenant)272 self._set_access_point(tenant)273 def _get_server_ip(self, server, floating=False):274 """275 returns the ip (floating/internal) of a server276 """277 if floating:278 server_ip = self.floating_ips[server['id']].floating_ip_address279 else:280 server_ip = None281 network_name = self.tenants[server['tenant_id']].network.name282 if network_name in server['addresses']:283 server_ip = server['addresses'][network_name][0]['addr']284 return server_ip285 def _connect_to_access_point(self, tenant):286 """287 create ssh connection to tenant access point288 """289 access_point_ssh = \290 self.floating_ips[tenant.access_point['id']].floating_ip_address291 private_key = tenant.keypair['private_key']292 access_point_ssh = self._ssh_to_server(access_point_ssh,293 private_key=private_key)294 return access_point_ssh295 def _check_connectivity(self, access_point, ip, should_succeed=True):296 if should_succeed:297 msg = "Timed out waiting for %s to become reachable" % ip298 else:299 msg = "%s is reachable" % ip300 try:301 self.assertTrue(self._check_remote_connectivity(access_point, ip,302 should_succeed),303 msg)304 except test.exceptions.SSHTimeout:305 raise306 except Exception:307 debug.log_net_debug()308 raise309 def _test_in_tenant_block(self, tenant):310 access_point_ssh = self._connect_to_access_point(tenant)311 for server in tenant.servers:312 self._check_connectivity(access_point=access_point_ssh,313 ip=self._get_server_ip(server),314 should_succeed=False)315 def _test_in_tenant_allow(self, tenant):316 ruleset = dict(317 protocol='icmp',318 remote_group_id=tenant.security_groups['default'].id,319 direction='ingress'320 )321 self._create_security_group_rule(322 secgroup=tenant.security_groups['default'],323 **ruleset324 )325 access_point_ssh = self._connect_to_access_point(tenant)326 for server in tenant.servers:327 self._check_connectivity(access_point=access_point_ssh,328 ip=self._get_server_ip(server))329 def _test_cross_tenant_block(self, source_tenant, dest_tenant):330 """331 if public router isn't defined, then dest_tenant access is via332 floating-ip333 """334 access_point_ssh = self._connect_to_access_point(source_tenant)335 ip = self._get_server_ip(dest_tenant.access_point,336 floating=self.floating_ip_access)337 self._check_connectivity(access_point=access_point_ssh, ip=ip,338 should_succeed=False)339 def _test_cross_tenant_allow(self, source_tenant, dest_tenant):340 """341 check for each direction:342 creating rule for tenant incoming traffic enables only 1way traffic343 """344 ruleset = dict(345 protocol='icmp',346 direction='ingress'347 )348 self._create_security_group_rule(349 secgroup=dest_tenant.security_groups['default'],350 client=dest_tenant.manager.network_client,351 **ruleset352 )353 access_point_ssh = self._connect_to_access_point(source_tenant)354 ip = self._get_server_ip(dest_tenant.access_point,355 floating=self.floating_ip_access)356 self._check_connectivity(access_point_ssh, ip)357 # test that reverse traffic is still blocked358 self._test_cross_tenant_block(dest_tenant, source_tenant)359 # allow reverse traffic and check360 self._create_security_group_rule(361 secgroup=source_tenant.security_groups['default'],362 client=source_tenant.manager.network_client,363 **ruleset364 )365 access_point_ssh_2 = self._connect_to_access_point(dest_tenant)366 ip = self._get_server_ip(source_tenant.access_point,367 floating=self.floating_ip_access)368 self._check_connectivity(access_point_ssh_2, ip)369 def _verify_mac_addr(self, tenant):370 """371 verify that VM (tenant's access point) has the same ip,mac as listed in372 port list373 """374 access_point_ssh = self._connect_to_access_point(tenant)375 mac_addr = access_point_ssh.get_mac_address()376 mac_addr = mac_addr.strip().lower()377 # Get the fixed_ips and mac_address fields of all ports. Select378 # only those two columns to reduce the size of the response.379 port_list = self._list_ports(fields=['fixed_ips', 'mac_address'])380 port_detail_list = [381 (port['fixed_ips'][0]['subnet_id'],382 port['fixed_ips'][0]['ip_address'],383 port['mac_address'].lower())384 for port in port_list if port['fixed_ips']385 ]386 server_ip = self._get_server_ip(tenant.access_point)387 subnet_id = tenant.subnet.id388 self.assertIn((subnet_id, server_ip, mac_addr), port_detail_list)389 @test.attr(type='smoke')390 @test.services('compute', 'network')391 def test_cross_tenant_traffic(self):392 try:393 # deploy new tenant394 self._deploy_tenant(self.alt_tenant)395 self._verify_network_details(self.alt_tenant)396 self._verify_mac_addr(self.alt_tenant)397 # cross tenant check398 source_tenant = self.primary_tenant399 dest_tenant = self.alt_tenant400 self._test_cross_tenant_block(source_tenant, dest_tenant)401 self._test_cross_tenant_allow(source_tenant, dest_tenant)402 except Exception:403 for tenant in self.tenants.values():404 self._log_console_output(servers=tenant.servers)405 raise406 @test.attr(type='smoke')407 @test.services('compute', 'network')408 def test_in_tenant_traffic(self):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful