How to use _connect_to_access_point method in tempest

Best Python code snippet using tempest_python

test_security_groups_basic_ops.py

Source:test_security_groups_basic_ops.py Github

copy

Full Screen

...271 network_name = self.tenants[server['tenant_id']].network.name272 if network_name in server['addresses']:273 server_ip = server['addresses'][network_name][0]['addr']274 return server_ip275 def _connect_to_access_point(self, tenant):276 """create ssh connection to tenant access point"""277 access_point_ssh = \278 self.floating_ips[tenant.access_point['id']].floating_ip_address279 private_key = tenant.keypair['private_key']280 access_point_ssh = self._ssh_to_server(access_point_ssh,281 private_key=private_key)282 return access_point_ssh283 def _check_connectivity(self, access_point, ip, should_succeed=True):284 if should_succeed:285 msg = "Timed out waiting for %s to become reachable" % ip286 else:287 msg = "%s is reachable" % ip288 self.assertTrue(self._check_remote_connectivity(access_point, ip,289 should_succeed), msg)290 def _test_in_tenant_block(self, tenant):291 access_point_ssh = self._connect_to_access_point(tenant)292 for server in tenant.servers:293 self._check_connectivity(access_point=access_point_ssh,294 ip=self._get_server_ip(server),295 should_succeed=False)296 def _test_in_tenant_allow(self, tenant):297 ruleset = dict(298 protocol='icmp',299 remote_group_id=tenant.security_groups['default'].id,300 direction='ingress'301 )302 self._create_security_group_rule(303 secgroup=tenant.security_groups['default'],304 **ruleset305 )306 access_point_ssh = self._connect_to_access_point(tenant)307 for server in tenant.servers:308 self._check_connectivity(access_point=access_point_ssh,309 ip=self._get_server_ip(server))310 def _test_cross_tenant_block(self, source_tenant, dest_tenant):311 # if public router isn't defined, then dest_tenant access is via312 # floating-ip313 access_point_ssh = self._connect_to_access_point(source_tenant)314 ip = self._get_server_ip(dest_tenant.access_point,315 floating=self.floating_ip_access)316 self._check_connectivity(access_point=access_point_ssh, ip=ip,317 should_succeed=False)318 def _test_cross_tenant_allow(self, source_tenant, dest_tenant):319 """check for each direction:320 creating rule for tenant incoming traffic enables only 1way traffic321 """322 ruleset = dict(323 protocol='icmp',324 direction='ingress'325 )326 self._create_security_group_rule(327 secgroup=dest_tenant.security_groups['default'],328 client=dest_tenant.manager.network_client,329 **ruleset330 )331 access_point_ssh = self._connect_to_access_point(source_tenant)332 ip = self._get_server_ip(dest_tenant.access_point,333 floating=self.floating_ip_access)334 self._check_connectivity(access_point_ssh, ip)335 # test that reverse traffic is still blocked336 self._test_cross_tenant_block(dest_tenant, source_tenant)337 # allow reverse traffic and check338 self._create_security_group_rule(339 secgroup=source_tenant.security_groups['default'],340 client=source_tenant.manager.network_client,341 **ruleset342 )343 access_point_ssh_2 = self._connect_to_access_point(dest_tenant)344 ip = self._get_server_ip(source_tenant.access_point,345 floating=self.floating_ip_access)346 self._check_connectivity(access_point_ssh_2, ip)347 def _verify_mac_addr(self, tenant):348 """Verify that VM has the same ip, mac as listed in port"""349 access_point_ssh = self._connect_to_access_point(tenant)350 mac_addr = access_point_ssh.get_mac_address()351 mac_addr = mac_addr.strip().lower()352 # Get the fixed_ips and mac_address fields of all ports. Select353 # only those two columns to reduce the size of the response.354 port_list = self._list_ports(fields=['fixed_ips', 'mac_address'])355 port_detail_list = [356 (port['fixed_ips'][0]['subnet_id'],357 port['fixed_ips'][0]['ip_address'],358 port['mac_address'].lower())359 for port in port_list if port['fixed_ips']360 ]361 server_ip = self._get_server_ip(tenant.access_point)362 subnet_id = tenant.subnet.id363 self.assertIn((subnet_id, server_ip, mac_addr), port_detail_list)364 @test.idempotent_id('e79f879e-debb-440c-a7e4-efeda05b6848')365 @test.services('compute', 'network')366 def test_cross_tenant_traffic(self):367 if not self.credentials_provider.is_multi_tenant():368 raise self.skipException("No secondary tenant defined")369 try:370 # deploy new tenant371 self._deploy_tenant(self.alt_tenant)372 self._verify_network_details(self.alt_tenant)373 self._verify_mac_addr(self.alt_tenant)374 # cross tenant check375 source_tenant = self.primary_tenant376 dest_tenant = self.alt_tenant377 self._test_cross_tenant_block(source_tenant, dest_tenant)378 self._test_cross_tenant_allow(source_tenant, dest_tenant)379 except Exception:380 for tenant in self.tenants.values():381 self._log_console_output(servers=tenant.servers)382 raise383 @test.idempotent_id('63163892-bbf6-4249-aa12-d5ea1f8f421b')384 @test.services('compute', 'network')385 def test_in_tenant_traffic(self):386 try:387 self._create_tenant_servers(self.primary_tenant, num=1)388 # in-tenant check389 self._test_in_tenant_block(self.primary_tenant)390 self._test_in_tenant_allow(self.primary_tenant)391 except Exception:392 for tenant in self.tenants.values():393 self._log_console_output(servers=tenant.servers)394 raise395 @test.idempotent_id('f4d556d7-1526-42ad-bafb-6bebf48568f6')396 @test.services('compute', 'network')397 def test_port_update_new_security_group(self):398 """Verifies the traffic after updating the vm port399 With new security group having appropriate rule.400 """401 new_tenant = self.primary_tenant402 # Create empty security group and add icmp rule in it403 new_sg = self._create_empty_security_group(404 namestart='secgroup_new-',405 tenant_id=new_tenant.creds.tenant_id,406 client=new_tenant.manager.security_groups_client)407 icmp_rule = dict(408 protocol='icmp',409 direction='ingress',410 )411 self._create_security_group_rule(412 secgroup=new_sg,413 client=new_tenant.manager.network_client,414 **icmp_rule)415 new_tenant.security_groups.update(new_sg=new_sg)416 # Create server with default security group417 name = 'server-{tenant}-gen-1'.format(418 tenant=new_tenant.creds.tenant_name419 )420 name = data_utils.rand_name(name)421 server = self._create_server(name, new_tenant)422 # Check connectivity failure with default security group423 try:424 access_point_ssh = self._connect_to_access_point(new_tenant)425 self._check_connectivity(access_point=access_point_ssh,426 ip=self._get_server_ip(server),427 should_succeed=False)428 server_id = server['id']429 port_id = self._list_ports(device_id=server_id)[0]['id']430 # update port with new security group and check connectivity431 self.ports_client.update_port(port_id, security_groups=[432 new_tenant.security_groups['new_sg'].id])433 self._check_connectivity(434 access_point=access_point_ssh,435 ip=self._get_server_ip(server))436 except Exception:437 for tenant in self.tenants.values():438 self._log_console_output(servers=tenant.servers)439 raise440 @test.idempotent_id('d2f77418-fcc4-439d-b935-72eca704e293')441 @test.services('compute', 'network')442 def test_multiple_security_groups(self):443 """Verify multiple security groups and checks that rules444 provided in the both the groups is applied onto VM445 """446 tenant = self.primary_tenant447 ip = self._get_server_ip(tenant.access_point,448 floating=self.floating_ip_access)449 ssh_login = CONF.validation.image_ssh_user450 private_key = tenant.keypair['private_key']451 self.check_vm_connectivity(ip,452 should_connect=False)453 ruleset = dict(454 protocol='icmp',455 direction='ingress'456 )457 self._create_security_group_rule(458 secgroup=tenant.security_groups['default'],459 **ruleset460 )461 # NOTE: Vm now has 2 security groups one with ssh rule(462 # already added in setUp() method),and other with icmp rule463 # (added in the above step).The check_vm_connectivity tests464 # -that vm ping test is successful465 # -ssh to vm is successful466 self.check_vm_connectivity(ip,467 username=ssh_login,468 private_key=private_key,469 should_connect=True)470 @test.requires_ext(service='network', extension='port-security')471 @test.idempotent_id('7c811dcc-263b-49a3-92d2-1b4d8405f50c')472 @test.services('compute', 'network')473 def test_port_security_disable_security_group(self):474 """Verify the default security group rules is disabled."""475 new_tenant = self.primary_tenant476 # Create server477 name = 'server-{tenant}-gen-1'.format(478 tenant=new_tenant.creds.tenant_name479 )480 name = data_utils.rand_name(name)481 server = self._create_server(name, new_tenant)482 access_point_ssh = self._connect_to_access_point(new_tenant)483 server_id = server['id']484 port_id = self._list_ports(device_id=server_id)[0]['id']485 # Flip the port's port security and check connectivity486 try:487 self.ports_client.update_port(port_id,488 port_security_enabled=True,489 security_groups=[])490 self._check_connectivity(access_point=access_point_ssh,491 ip=self._get_server_ip(server),492 should_succeed=False)493 self.ports_client.update_port(port_id,494 port_security_enabled=False,495 security_groups=[])496 self._check_connectivity(...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful