How to use create_empty_security_group method in tempest

Best Python code snippet using tempest_python

manager.py

Source:manager.py Github

copy

Full Screen

...478 if security_groups_client is None:479 security_groups_client = self.security_groups_client480 if project_id is None:481 project_id = security_groups_client.project_id482 secgroup = self.create_empty_security_group(483 namestart=namestart, client=security_groups_client,484 project_id=project_id)485 # Add rules to the security group486 rules = self.create_loginable_secgroup_rule(487 security_group_rules_client=security_group_rules_client,488 secgroup=secgroup,489 security_groups_client=security_groups_client)490 for rule in rules:491 self.assertEqual(project_id, rule['project_id'])492 self.assertEqual(secgroup['id'], rule['security_group_id'])493 return secgroup494 def create_empty_security_group(self, client=None, project_id=None,495 namestart='secgroup-smoke'):496 """Create a security group without rules.497 Default rules will be created:498 - IPv4 egress to any499 - IPv6 egress to any500 :param project_id: secgroup will be created in this project501 :returns: the created security group502 """503 if client is None:504 client = self.security_groups_client505 if not project_id:506 project_id = client.project_id507 sg_name = data_utils.rand_name(namestart)508 sg_desc = sg_name + " description"...

Full Screen

Full Screen

test_security_groups_basic_ops.py

Source:test_security_groups_basic_ops.py Github

copy

Full Screen

...166 def _create_tenant_keypairs(self, tenant):167 keypair = self.create_keypair(tenant.manager.keypairs_client)168 tenant.keypair = keypair169 def _create_tenant_security_groups(self, tenant):170 access_sg = self.create_empty_security_group(171 namestart='secgroup_access-',172 project_id=tenant.creds.project_id,173 client=tenant.manager.security_groups_client174 )175 # don't use default secgroup since it allows in-project traffic176 def_sg = self.create_empty_security_group(177 namestart='secgroup_general-',178 project_id=tenant.creds.project_id,179 client=tenant.manager.security_groups_client180 )181 tenant.security_groups.update(access=access_sg, default=def_sg)182 ssh_rule = dict(183 protocol='tcp',184 port_range_min=22,185 port_range_max=22,186 direction='ingress',187 )188 sec_group_rules_client = tenant.manager.security_group_rules_client189 self.create_security_group_rule(190 secgroup=access_sg,191 sec_group_rules_client=sec_group_rules_client,192 **ssh_rule)193 def _verify_network_details(self, tenant):194 # Checks that we see the newly created network/subnet/router via195 # checking the result of list_[networks,routers,subnets]196 # Check that (router, subnet) couple exist in port_list197 seen_nets = self.os_admin.networks_client.list_networks()198 seen_names = [n['name'] for n in seen_nets['networks']]199 seen_ids = [n['id'] for n in seen_nets['networks']]200 self.assertIn(tenant.network['name'], seen_names)201 self.assertIn(tenant.network['id'], seen_ids)202 seen_subnets = [203 (n['id'], n['cidr'], n['network_id']) for n in204 self.os_admin.subnets_client.list_subnets()['subnets']205 ]206 mysubnet = (tenant.subnet['id'], tenant.subnet['cidr'],207 tenant.network['id'])208 self.assertIn(mysubnet, seen_subnets)209 seen_routers = self.os_admin.routers_client.list_routers()210 seen_router_ids = [n['id'] for n in seen_routers['routers']]211 seen_router_names = [n['name'] for n in seen_routers['routers']]212 self.assertIn(tenant.router['name'], seen_router_names)213 self.assertIn(tenant.router['id'], seen_router_ids)214 myport = (tenant.router['id'], tenant.subnet['id'])215 router_ports = [216 (i['device_id'], f['subnet_id'])217 for i in self.os_admin.ports_client.list_ports(218 device_id=tenant.router['id'])['ports']219 if net_info.is_router_interface_port(i)220 for f in i['fixed_ips']221 ]222 self.assertIn(myport, router_ports)223 def _create_server(self, name, tenant, security_groups, **kwargs):224 """Creates a server and assigns it to security group.225 If multi-host is enabled, Ensures servers are created on different226 compute nodes, by storing created servers' ids and uses different_host227 as scheduler_hints on creation.228 Validates servers are created as requested, using admin client.229 """230 security_groups_names = [{'name': s['name']} for s in security_groups]231 if self.multi_node:232 kwargs["scheduler_hints"] = {'different_host': self.servers}233 server = self.create_server(234 name=name,235 networks=[{'uuid': tenant.network["id"]}],236 key_name=tenant.keypair['name'],237 security_groups=security_groups_names,238 clients=tenant.manager,239 **kwargs)240 if 'security_groups' in server:241 self.assertEqual(242 sorted([s['name'] for s in security_groups]),243 sorted([s['name'] for s in server['security_groups']]))244 # Verify servers are on different compute nodes245 if self.multi_node:246 new_host = self.get_host_for_server(server["id"])247 host_list = [self.get_host_for_server(s) for s in self.servers]248 self.assertNotIn(new_host, host_list,249 message="Failed to boot servers on different "250 "Compute nodes.")251 self.servers.append(server["id"])252 return server253 def _create_tenant_servers(self, tenant, num=1):254 for i in range(num):255 name = 'server-{tenant}-gen-{num}'.format(256 tenant=tenant.creds.tenant_name,257 num=i258 )259 name = data_utils.rand_name(name)260 server = self._create_server(name, tenant,261 [tenant.security_groups['default']])262 tenant.servers.append(server)263 def _set_access_point(self, tenant):264 # creates a server in a secgroup with rule allowing external ssh265 # in order to access project internal network266 # workaround ip namespace267 secgroups = tenant.security_groups.values()268 name = 'server-{tenant}-access_point'.format(269 tenant=tenant.creds.tenant_name)270 name = data_utils.rand_name(name)271 server = self._create_server(name, tenant,272 security_groups=secgroups)273 tenant.access_point = server274 self._assign_floating_ips(tenant, server)275 def _assign_floating_ips(self, tenant, server):276 public_network_id = CONF.network.public_network_id277 floating_ip = self.create_floating_ip(278 server, public_network_id,279 client=tenant.manager.floating_ips_client)280 self.floating_ips.setdefault(server['id'], floating_ip)281 def _create_tenant_network(self, tenant, port_security_enabled=True):282 network, subnet, router = self.setup_network_subnet_with_router(283 networks_client=tenant.manager.networks_client,284 routers_client=tenant.manager.routers_client,285 subnets_client=tenant.manager.subnets_client,286 port_security_enabled=port_security_enabled)287 tenant.set_network(network, subnet, router)288 def _deploy_tenant(self, tenant_or_id):289 """creates:290 network291 subnet292 router (if public not defined)293 access security group294 access-point server295 """296 if not isinstance(tenant_or_id, self.TenantProperties):297 tenant = self.tenants[tenant_or_id]298 else:299 tenant = tenant_or_id300 self._create_tenant_keypairs(tenant)301 self._create_tenant_network(tenant)302 self._create_tenant_security_groups(tenant)303 self._set_access_point(tenant)304 def _get_server_ip(self, server, floating=False):305 """returns the ip (floating/internal) of a server"""306 if floating:307 server_ip = self.floating_ips[server['id']]['floating_ip_address']308 else:309 server_ip = None310 network_name = self.tenants[server['tenant_id']].network['name']311 if network_name in server['addresses']:312 server_ip = server['addresses'][network_name][0]['addr']313 return server_ip314 def _connect_to_access_point(self, tenant):315 """create ssh connection to tenant access point"""316 access_point_ssh = \317 self.floating_ips[tenant.access_point['id']]['floating_ip_address']318 private_key = tenant.keypair['private_key']319 access_point_ssh = self.get_remote_client(320 access_point_ssh, private_key=private_key,321 server=tenant.access_point)322 return access_point_ssh323 def _test_in_tenant_block(self, tenant):324 access_point_ssh = self._connect_to_access_point(tenant)325 for server in tenant.servers:326 self.check_remote_connectivity(source=access_point_ssh,327 dest=self._get_server_ip(server),328 should_succeed=False)329 def _test_in_tenant_allow(self, tenant):330 ruleset = dict(331 protocol='icmp',332 remote_group_id=tenant.security_groups['default']['id'],333 direction='ingress'334 )335 self.create_security_group_rule(336 secgroup=tenant.security_groups['default'],337 security_groups_client=tenant.manager.security_groups_client,338 **ruleset339 )340 access_point_ssh = self._connect_to_access_point(tenant)341 for server in tenant.servers:342 self.check_remote_connectivity(source=access_point_ssh,343 dest=self._get_server_ip(server))344 def _test_cross_tenant_block(self, source_tenant, dest_tenant, ruleset):345 # if public router isn't defined, then dest_tenant access is via346 # floating-ip347 protocol = ruleset['protocol']348 access_point_ssh = self._connect_to_access_point(source_tenant)349 ip = self._get_server_ip(dest_tenant.access_point,350 floating=self.floating_ip_access)351 self.check_remote_connectivity(source=access_point_ssh, dest=ip,352 should_succeed=False, protocol=protocol)353 def _test_cross_tenant_allow(self, source_tenant, dest_tenant, ruleset):354 """check for each direction:355 creating rule for tenant incoming traffic enables only 1way traffic356 """357 protocol = ruleset['protocol']358 sec_group_rules_client = (359 dest_tenant.manager.security_group_rules_client)360 self.create_security_group_rule(361 secgroup=dest_tenant.security_groups['default'],362 sec_group_rules_client=sec_group_rules_client,363 **ruleset364 )365 access_point_ssh = self._connect_to_access_point(source_tenant)366 ip = self._get_server_ip(dest_tenant.access_point,367 floating=self.floating_ip_access)368 self.check_remote_connectivity(access_point_ssh, ip, protocol=protocol)369 # test that reverse traffic is still blocked370 self._test_cross_tenant_block(dest_tenant, source_tenant, ruleset)371 # allow reverse traffic and check372 sec_group_rules_client = (373 source_tenant.manager.security_group_rules_client)374 self.create_security_group_rule(375 secgroup=source_tenant.security_groups['default'],376 sec_group_rules_client=sec_group_rules_client,377 **ruleset378 )379 access_point_ssh_2 = self._connect_to_access_point(dest_tenant)380 ip = self._get_server_ip(source_tenant.access_point,381 floating=self.floating_ip_access)382 self.check_remote_connectivity(access_point_ssh_2, ip,383 protocol=protocol)384 def _verify_mac_addr(self, tenant):385 """Verify that VM has the same ip, mac as listed in port"""386 access_point_ssh = self._connect_to_access_point(tenant)387 mac_addr = access_point_ssh.get_mac_address()388 mac_addr = mac_addr.strip().lower()389 # Get the fixed_ips and mac_address fields of all ports. Select390 # only those two columns to reduce the size of the response.391 port_list = self.os_admin.ports_client.list_ports(392 fields=['fixed_ips', 'mac_address'])['ports']393 port_detail_list = [394 (port['fixed_ips'][0]['subnet_id'],395 port['fixed_ips'][0]['ip_address'],396 port['mac_address'].lower())397 for port in port_list if port['fixed_ips']398 ]399 server_ip = self._get_server_ip(tenant.access_point)400 subnet_id = tenant.subnet['id']401 self.assertIn((subnet_id, server_ip, mac_addr), port_detail_list)402 def _log_console_output_for_all_tenants(self):403 for tenant in self.tenants.values():404 client = tenant.manager.servers_client405 self.log_console_output(servers=tenant.servers, client=client)406 if tenant.access_point is not None:407 self.log_console_output(408 servers=[tenant.access_point], client=client)409 def _create_protocol_ruleset(self, protocol, port=80):410 if protocol == 'icmp':411 ruleset = dict(protocol='icmp',412 direction='ingress')413 else:414 ruleset = dict(protocol=protocol,415 port_range_min=port,416 port_range_max=port,417 direction='ingress')418 return ruleset419 @decorators.idempotent_id('e79f879e-debb-440c-a7e4-efeda05b6848')420 @utils.services('compute', 'network')421 def test_cross_tenant_traffic(self):422 if not self.credentials_provider.is_multi_tenant():423 raise self.skipException("No secondary tenant defined")424 try:425 # deploy new project426 self._deploy_tenant(self.alt_tenant)427 self._verify_network_details(self.alt_tenant)428 self._verify_mac_addr(self.alt_tenant)429 # cross tenant check430 source_tenant = self.primary_tenant431 dest_tenant = self.alt_tenant432 protocol = CONF.scenario.protocol433 LOG.debug("Testing cross tenant traffic for %s protocol",434 protocol)435 if protocol in ['udp', 'tcp']:436 for tenant in [source_tenant, dest_tenant]:437 access_point = self._connect_to_access_point(tenant)438 access_point.nc_listen_host(protocol=protocol)439 ruleset = self._create_protocol_ruleset(protocol)440 self._test_cross_tenant_block(source_tenant, dest_tenant, ruleset)441 self._test_cross_tenant_allow(source_tenant, dest_tenant, ruleset)442 except Exception:443 self._log_console_output_for_all_tenants()444 raise445 @decorators.idempotent_id('63163892-bbf6-4249-aa12-d5ea1f8f421b')446 @utils.services('compute', 'network')447 def test_in_tenant_traffic(self):448 try:449 self._create_tenant_servers(self.primary_tenant, num=1)450 # in-tenant check451 self._test_in_tenant_block(self.primary_tenant)452 self._test_in_tenant_allow(self.primary_tenant)453 except Exception:454 self._log_console_output_for_all_tenants()455 raise456 @decorators.idempotent_id('f4d556d7-1526-42ad-bafb-6bebf48568f6')457 @decorators.attr(type='slow')458 @utils.services('compute', 'network')459 def test_port_update_new_security_group(self):460 """Verifies the traffic after updating the vm port461 With new security group having appropriate rule.462 """463 new_tenant = self.primary_tenant464 # Create empty security group and add icmp rule in it465 new_sg = self.create_empty_security_group(466 namestart='secgroup_new-',467 project_id=new_tenant.creds.project_id,468 client=new_tenant.manager.security_groups_client)469 icmp_rule = dict(470 protocol='icmp',471 direction='ingress',472 )473 sec_group_rules_client = new_tenant.manager.security_group_rules_client474 self.create_security_group_rule(475 secgroup=new_sg,476 sec_group_rules_client=sec_group_rules_client,477 **icmp_rule)478 new_tenant.security_groups.update(new_sg=new_sg)479 # Create server with default security group...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful